Skip to content

Commit

Permalink
feat(qrm): fix vpa resize comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nightmeng committed Aug 20, 2024
1 parent 747145e commit fd505f6
Show file tree
Hide file tree
Showing 16 changed files with 237 additions and 295 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/google/uuid v1.3.0
github.com/h2non/gock v1.2.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.5.1-0.20240816033534-930640a4d608
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.1-0.20240816033534-930640a4d608 h1:9e+TkjTyn4IKvtcqJaxxsiHQkx0DyKbrWcPf3zcbUhY=
github.com/kubewharf/katalyst-api v0.5.1-0.20240816033534-930640a4d608/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078 h1:CSBXQOe0AzlWcGaww8uqOUDu+/4bL3hVNBz86oziOis=
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
15 changes: 9 additions & 6 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
}

general.InfoS("GetTopologyHints called",
general.InfoS("called",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
Expand Down Expand Up @@ -746,7 +746,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
}

general.InfoS("Allocate called",
general.InfoS("called",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
Expand Down Expand Up @@ -830,7 +830,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
}()

allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName)
if allocationInfo != nil && allocationInfo.OriginalAllocationResult.Size() >= reqInt && !util.PodIsInVPAResizing(req) {
if allocationInfo != nil && allocationInfo.OriginalAllocationResult.Size() >= reqInt && !util.PodInplaceUpdateResizing(req) {
general.InfoS("already allocated and meet requirement",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func (p *DynamicPolicy) getContainerRequestedCores(allocationInfo *state.Allocat
// if there is these two annotations in memory state, it is a new pod,
// we don't need to check the pod request from podWatcher
if allocationInfo.Annotations[consts.PodAnnotationAggregatedRequestsKey] != "" ||
allocationInfo.Annotations[consts.PodAnnotationVPAResizingKey] != "" {
allocationInfo.Annotations[consts.PodAnnotationInplaceUpdateResizingKey] != "" {
return allocationInfo.RequestQuantity
}
if state.CheckNUMABinding(allocationInfo) {
Expand All @@ -1176,20 +1176,23 @@ func (p *DynamicPolicy) getContainerRequestedCores(allocationInfo *state.Allocat
}

func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.ResourceRequest) (bool, error) {
_, reqFloat64, err := util.GetQuantityFromResourceReq(req)
_, reqFloat64, err := util.GetPodAggregatedRequestResource(req)
if err != nil {
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
}

shareCoresAllocated := reqFloat64
podEntries := p.state.GetPodEntries()
for podUid, podEntry := range podEntries {
if podEntry.IsPoolEntry() {
continue
}
if podUid == req.PodUid {
continue
}
for _, allocation := range podEntry {
// shareCoresAllocated should involve both main and sidecar containers
if allocation.QoSLevel == consts.PodAnnotationQoSLevelSharedCores && !allocation.CheckNumaBinding() {
if state.CheckShared(allocation) && !state.CheckNUMABinding(allocation) {
shareCoresAllocated += p.getContainerRequestedCores(allocation)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("updateAllocationInfoByReq failed with error: %v", err)
}
if util.PodIsInVPAResizing(req) {
if allocationInfo == nil {
general.Errorf("pod: %s/%s, container: %s request to VPA resize, but not allocation info",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("no allocation info to VPA resize")
}
general.Infof("pod: %s/%s, container: %s request to VPA resize (%.02f->%.02f)",
req.PodNamespace, req.PodName, req.ContainerName, allocationInfo.RequestQuantity, reqFloat64)
allocationInfo.RequestQuantity = reqFloat64
}

if allocationInfo == nil {
general.Infof("pod: %s/%s, container: %s is met firstly, do ramp up with pooled cpus: %s",
Expand Down Expand Up @@ -150,7 +140,7 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context
}
}
} else if allocationInfo.RampUp {
if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
general.Errorf("pod: %s/%s, container: %s is still in ramp up, not allow to VPA resize",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("pod is still ramp up, not allow to VPA resize")
Expand All @@ -164,16 +154,20 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context
allocationInfo.TopologyAwareAssignments = pooledCPUsTopologyAwareAssignments
allocationInfo.OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(pooledCPUsTopologyAwareAssignments)
} else {
if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
general.Infof("pod: %s/%s, container: %s request to VPA resize (%.02f->%.02f)",
req.PodNamespace, req.PodName, req.ContainerName, allocationInfo.RequestQuantity, reqFloat64)
allocationInfo.RequestQuantity = reqFloat64

p.state.SetAllocationInfo(allocationInfo.PodUid, allocationInfo.ContainerName, allocationInfo)
_, err := p.doAndCheckPutAllocationInfoPodVPAResizingAware(originAllocationInfo, allocationInfo, false, true)
_, err := p.doAndCheckPutAllocationInfoPodResizingAware(originAllocationInfo, allocationInfo, false, true)
if err != nil {
general.Errorf("pod: %s/%s, container: %s doAndCheckPutAllocationInfoPodVPAResizingAware failed: %q",
general.Errorf("pod: %s/%s, container: %s doAndCheckPutAllocationInfoPodResizingAware failed: %q",
req.PodNamespace, req.PodName, req.ContainerName, err)
p.state.SetAllocationInfo(originAllocationInfo.PodUid, originAllocationInfo.ContainerName, originAllocationInfo)
return nil, err
}
} else {
// FIXME when reach here?
_, err := p.doAndCheckPutAllocationInfo(allocationInfo, true)
if err != nil {
general.Errorf("pod: %s/%s, container: %s doAndCheckPutAllocationInfo failed: %q",
Expand Down Expand Up @@ -216,7 +210,7 @@ func (p *DynamicPolicy) reclaimedCoresAllocationHandler(_ context.Context,
return nil, fmt.Errorf("reclaimedCoresAllocationHandler got nil request")
}

if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
return nil, fmt.Errorf("not support VPA resize for reclaimed cores")
}

Expand Down Expand Up @@ -307,7 +301,7 @@ func (p *DynamicPolicy) dedicatedCoresAllocationHandler(ctx context.Context,
return nil, fmt.Errorf("dedicatedCoresAllocationHandler got nil req")
}

if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
return nil, fmt.Errorf("not support VPA resize for dedicated cores")
}

Expand Down Expand Up @@ -638,7 +632,7 @@ func (p *DynamicPolicy) allocateSharedNumaBindingCPUs(req *pluginapi.ResourceReq
RequestQuantity: reqFloat64,
}

if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
originAllocationInfo := p.state.GetAllocationInfo(allocationInfo.PodUid, allocationInfo.ContainerName)
if originAllocationInfo == nil {
general.Errorf("pod: %s/%s, container: %s request to cpu VPA resize alloation, but no origin allocation info, reject it",
Expand All @@ -649,10 +643,11 @@ func (p *DynamicPolicy) allocateSharedNumaBindingCPUs(req *pluginapi.ResourceReq
general.Infof("pod: %s/%s, container: %s request to cpu VPA resize allocation (%.02f->%.02f)",
req.PodNamespace, req.PodName, req.ContainerName, originAllocationInfo.RequestQuantity, allocationInfo.RequestQuantity)
p.state.SetAllocationInfo(allocationInfo.PodUid, allocationInfo.ContainerName, allocationInfo)
checkedAllocationInfo, err := p.doAndCheckPutAllocationInfoPodVPAResizingAware(originAllocationInfo, allocationInfo, false, true)
checkedAllocationInfo, err := p.doAndCheckPutAllocationInfoPodResizingAware(originAllocationInfo, allocationInfo, false, true)
if err != nil {
general.Errorf("pod: %s/%s, container: %s request to cpu VPA resize allocation, but doAndCheckPutAllocationInfoPodVPAResizingAware failed: %q",
general.Errorf("pod: %s/%s, container: %s request to cpu VPA resize allocation, but doAndCheckPutAllocationInfoPodResizingAware failed: %q",
req.PodNamespace, req.PodName, req.ContainerName, err)
p.state.SetAllocationInfo(originAllocationInfo.PodUid, originAllocationInfo.ContainerName, originAllocationInfo)
return nil, fmt.Errorf("doAndCheckPutAllocationInfo failed with error: %v", err)
}
return checkedAllocationInfo, nil
Expand All @@ -672,14 +667,14 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntries(allocationInfos
return p.putAllocationsAndAdjustAllocationEntriesVpaAware(nil, allocationInfos, incrByReq, false)
}

func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesVpaAware(originAllocationInfos, allocationInfos []*state.AllocationInfo, incrByReq, podIsInVpaResize bool) error {
func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesVpaAware(originAllocationInfos, allocationInfos []*state.AllocationInfo, incrByReq, podInplaceUpdateResizing bool) error {
if len(allocationInfos) == 0 {
return nil
}
if podIsInVpaResize {
if podInplaceUpdateResizing {
if len(originAllocationInfos) != 1 && len(allocationInfos) != 1 {
general.Errorf("cannot adjust allocation entries for invalid allocation infos")
return nil
return fmt.Errorf("invalid vpa allocation infos length")
}
}

Expand Down Expand Up @@ -713,7 +708,7 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesVpaAware(originA
}

poolsQuantityMap = machine.ParseCPUAssignmentQuantityMap(csetMap)
if podIsInVpaResize {
if podInplaceUpdateResizing {
// adjust pool resize
originAllocationInfo := originAllocationInfos[0]
allocationInfo := allocationInfos[0]
Expand Down Expand Up @@ -744,8 +739,8 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesVpaAware(originA
return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err)
}

if incrByReq || podIsInVpaResize {
if podIsInVpaResize {
if incrByReq || podInplaceUpdateResizing {
if podInplaceUpdateResizing {
general.Infof("pod: %s/%s, container: %s request to re-calc pool size for cpu VPA resize",
allocationInfos[0].PodNamespace, allocationInfos[0].PodName, allocationInfos[0].ContainerName)
}
Expand Down Expand Up @@ -1684,13 +1679,13 @@ func (p *DynamicPolicy) shouldSharedCoresRampUp(podUID string) bool {
}
}

func (p *DynamicPolicy) doAndCheckPutAllocationInfoPodVPAResizingAware(originAllocationInfo, allocationInfo *state.AllocationInfo, incrByReq, podIsInVPAResizing bool) (*state.AllocationInfo, error) {
func (p *DynamicPolicy) doAndCheckPutAllocationInfoPodResizingAware(originAllocationInfo, allocationInfo *state.AllocationInfo, incrByReq, podInplaceUpdateResizing bool) (*state.AllocationInfo, error) {
if allocationInfo == nil {
return nil, fmt.Errorf("doAndCheckPutAllocationInfo got nil allocationInfo")
}

// need to adjust pools and putAllocationsAndAdjustAllocationEntries will set the allocationInfo after adjusted
err := p.putAllocationsAndAdjustAllocationEntriesVpaAware([]*state.AllocationInfo{originAllocationInfo}, []*state.AllocationInfo{allocationInfo}, incrByReq, podIsInVPAResizing)
err := p.putAllocationsAndAdjustAllocationEntriesVpaAware([]*state.AllocationInfo{originAllocationInfo}, []*state.AllocationInfo{allocationInfo}, incrByReq, podInplaceUpdateResizing)
if err != nil {
general.Errorf("pod: %s/%s, container: %s putAllocationsAndAdjustAllocationEntriesVpaAware failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err)
Expand All @@ -1708,7 +1703,7 @@ func (p *DynamicPolicy) doAndCheckPutAllocationInfoPodVPAResizingAware(originAll
}

func (p *DynamicPolicy) doAndCheckPutAllocationInfo(allocationInfo *state.AllocationInfo, incrByReq bool) (*state.AllocationInfo, error) {
return p.doAndCheckPutAllocationInfoPodVPAResizingAware(nil, allocationInfo, incrByReq, false)
return p.doAndCheckPutAllocationInfoPodResizingAware(nil, allocationInfo, incrByReq, false)
}

func (p *DynamicPolicy) getReclaimOverlapShareRatio(entries state.PodEntries) (map[string]float64, error) {
Expand Down
89 changes: 33 additions & 56 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,24 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
return p.sharedCoresWithNUMABindingHintHandler(ctx, req)
}

ok, err := p.checkNormalShareCoresCpuResource(req)
if err != nil {
general.Errorf("failed to check share cores cpu resource for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("failed to check share cores cpu resource: %q", err)
}

if !ok {
_ = p.emitter.StoreInt64(util.MetricNameShareCoresNoEnoughResourceFailed, 1, metrics.MetricTypeNameCount, []metrics.MetricTag{
{
Key: "resource",
Val: "cpu",
},
{
Key: "userPodNs",
Val: req.PodNamespace,
},
{
Key: "userPodName",
Val: req.PodName,
},
{
Key: "userContainerName",
Val: req.ContainerName,
},
}...)
return nil, fmt.Errorf("no enough cpu resource")
// TODO: support sidecar follow main container for normal share cores in future
if req.ContainerType == pluginapi.ContainerType_MAIN {
ok, err := p.checkNormalShareCoresCpuResource(req)
if err != nil {
general.Errorf("failed to check share cores cpu resource for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("failed to check share cores cpu resource: %q", err)
}

if !ok {
_ = p.emitter.StoreInt64(util.MetricNameShareCoresNoEnoughResourceFailed, 1, metrics.MetricTypeNameCount, metrics.ConvertMapToTags(map[string]string{
"resource": v1.ResourceCPU.String(),
"podNamespace": req.PodNamespace,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, fmt.Errorf("no enough cpu resource")
}
}

return util.PackResourceHintsResponse(req, string(v1.ResourceCPU),
Expand All @@ -90,7 +81,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
func (p *DynamicPolicy) reclaimedCoresHintHandler(ctx context.Context,
req *pluginapi.ResourceRequest,
) (*pluginapi.ResourceHintsResponse, error) {
if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
return nil, fmt.Errorf("not support VPA for reclaimed cores")
}
return p.sharedCoresHintHandler(ctx, req)
Expand All @@ -103,7 +94,7 @@ func (p *DynamicPolicy) dedicatedCoresHintHandler(ctx context.Context,
return nil, fmt.Errorf("dedicatedCoresHintHandler got nil req")
}

if util.PodIsInVPAResizing(req) {
if util.PodInplaceUpdateResizing(req) {
return nil, fmt.Errorf("not support VPA for dedicated cores")
}

Expand Down Expand Up @@ -554,56 +545,43 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
if allocationInfo != nil {
hints = cpuutil.RegenerateHints(allocationInfo, reqInt)

// regenerateHints failed. need to clear container record and re-calculate.
if hints == nil {
// [TODO]: generateMachineStateFromPodEntries adapts to shared_cores with numa_binding
// clear the current container and regenerate machine state in follow cases:
// 1. regenerateHints failed.
// 2. the container is inplace update resizing.
// hints it as a new container
if hints == nil || util.PodInplaceUpdateResizing(req) {
machineState, err = p.clearContainerAndRegenerateMachineState(podEntries, req)
if err != nil {
general.Errorf("pod: %s/%s, container: %s clearContainerAndRegenerateMachineState failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
}
}
} else if util.PodIsInVPAResizing(req) {
} else if util.PodInplaceUpdateResizing(req) {
general.Errorf("pod: %s/%s, container: %s request to cpu VPA resize, but no origin allocation info",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("no origin allocation info")
}

general.Infof("pod: %s/%s, container: %s, VPA: %v", req.PodNamespace, req.PodName, req.ContainerName, util.PodIsInVPAResizing(req))
if util.PodIsInVPAResizing(req) {
general.Infof("pod: %s/%s, container: %s, VPA: %v", req.PodNamespace, req.PodName, req.ContainerName, util.PodInplaceUpdateResizing(req))
if util.PodInplaceUpdateResizing(req) {
numaset := allocationInfo.GetAllocationResultNUMASet()
if numaset.Size() != 1 {
general.Errorf("pod: %s/%s, container: %s is snb, but its numa set size is %d",
req.PodNamespace, req.PodName, req.ContainerName, numaset.Size())
return nil, fmt.Errorf("snb port not support cross numa")
}
nodeID := numaset.ToSliceInt()[0]
availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
allocatedReq, ok := allocationInfo.GetPodAggregatedRequest()
if !ok {
allocatedReq = 0
containerEntries := podEntries[req.PodUid]
for _, containerEntry := range containerEntries {
allocatedReq += containerEntry.RequestQuantity
}
}
allocatedReqInt := int(allocatedReq)

general.Infof("pod: %s/%s, container: %s request cpu VPA on numa %d (available: %d, allocated: %d, request: %d)",
req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, allocatedReqInt, reqInt)
if allocatedReqInt < reqInt && reqInt-allocatedReqInt > availableCPUQuantity { // no left resource to scale out
general.Infof("pod: %s/%s, container: %s request cpu VPA on numa %d (available: %d, request: %d)",
req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, reqInt)
if reqInt > availableCPUQuantity { // no left resource to scale out
general.Infof("pod: %s/%s, container: %s request cpu VPA, but no enough resource for it in current NUMA, checking migratable",
req.PodNamespace, req.PodName, req.ContainerName)
// FIXME move isVPAMigratable to config
// TODO move isVPAMigratable to config
isVpaMigratable := false
if isVpaMigratable {
machineState, err = p.clearContainerAndRegenerateMachineState(podEntries, req)
if err != nil {
general.Errorf("pod: %s/%s, container: %s clearContainerAndRegenerateMachineState failed in VPA mode with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, err
}

general.Infof("pod: %s/%s, container: %s request VPA and no enough resource in current NUMA, try to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
var calculateErr error
Expand Down Expand Up @@ -640,7 +618,6 @@ func (p *DynamicPolicy) clearContainerAndRegenerateMachineState(podEntries state
}

var err error
// [TODO]: generateMachineStateFromPodEntries adapts to shared_cores with numa_binding
machineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
Expand Down
Loading

0 comments on commit fd505f6

Please sign in to comment.