Skip to content

Commit

Permalink
Merge pull request #665 from nightmeng/dev/support-qrm-vpa-admit
Browse files Browse the repository at this point in the history
feat(qrm): support pod inplace update resize admit
  • Loading branch information
nightmeng authored Aug 21, 2024
2 parents e47e097 + 4908830 commit c2f518f
Show file tree
Hide file tree
Showing 23 changed files with 3,927 additions and 196 deletions.
4 changes: 2 additions & 2 deletions cmd/katalyst-agent/app/options/qrm/qrm_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfig
conf.ExtraStateFileAbsPath = o.ExtraStateFileAbsPath
conf.PodDebugAnnoKeys = o.PodDebugAnnoKeys
conf.UseKubeletReservedConfig = o.UseKubeletReservedConfig
conf.PodAnnotationKeptKeys = o.PodAnnotationKeptKeys
conf.PodLabelKeptKeys = o.PodLabelKeptKeys
conf.PodAnnotationKeptKeys = append(conf.PodAnnotationKeptKeys, o.PodAnnotationKeptKeys...)
conf.PodLabelKeptKeys = append(conf.PodLabelKeptKeys, o.PodLabelKeptKeys...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/google/uuid v1.3.0
github.com/h2non/gock v1.2.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d h1:6CuK3axf2B63zIkEu5XyxbaC+JArE/3Jo3QHvb+Hn0M=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078 h1:CSBXQOe0AzlWcGaww8uqOUDu+/4bL3hVNBz86oziOis=
github.com/kubewharf/katalyst-api v0.5.1-0.20240820031712-7c1239991078/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
187 changes: 159 additions & 28 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dynamicpolicy
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -436,24 +437,31 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
return nil, fmt.Errorf("GetNumaAwareAssignments err: %v", err)
}

podResources := make(map[string]*pluginapi.ContainerResources)
var allocationInfosJustFinishRampUp []*state.AllocationInfo
needUpdateMachineState := false
for podUID, containerEntries := range podEntries {
// if it's a pool, not returning to QRM
if containerEntries.IsPoolEntry() {
continue
}

if podResources[podUID] == nil {
podResources[podUID] = &pluginapi.ContainerResources{}
}

mainContainerAllocationInfo := podEntries[podUID].GetMainContainerEntry()
for containerName, allocationInfo := range containerEntries {
if allocationInfo == nil {
continue
}
allocationInfo = allocationInfo.Clone()

// sync allocation info from main container to sidecar
if allocationInfo.CheckSideCar() && mainContainerAllocationInfo != nil {
if p.applySidecarAllocationInfoFromMainContainer(allocationInfo, mainContainerAllocationInfo) {
general.Infof("pod: %s/%s, container: %s sync allocation info from main container",
allocationInfo.PodNamespace, allocationInfo.PodName, containerName)
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
needUpdateMachineState = true
}
}

initTs, tsErr := time.Parse(util.QRMTimeFormat, allocationInfo.InitTimestamp)
if tsErr != nil {
if state.CheckShared(allocationInfo) && !state.CheckNUMABinding(allocationInfo) {
Expand Down Expand Up @@ -484,6 +492,39 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
}
}

}
}

if len(allocationInfosJustFinishRampUp) > 0 {
if err = p.putAllocationsAndAdjustAllocationEntries(allocationInfosJustFinishRampUp, true); err != nil {
// not influencing return response to kubelet when putAllocationsAndAdjustAllocationEntries failed
general.Errorf("putAllocationsAndAdjustAllocationEntries failed with error: %v", err)
}
} else if needUpdateMachineState {
// NOTE: we only need update machine state when putAllocationsAndAdjustAllocationEntries is skipped,
// because putAllocationsAndAdjustAllocationEntries will update machine state.
general.Infof("GetResourcesAllocation update machine state")
podEntries = p.state.GetPodEntries()
updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("GetResourcesAllocation GenerateMachineStateFromPodEntries failed with error: %v", err)
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
}
p.state.SetMachineState(updatedMachineState)
}

podEntries = p.state.GetPodEntries()
podResources := make(map[string]*pluginapi.ContainerResources)
for podUID, containerEntries := range podEntries {
if containerEntries.IsPoolEntry() {
continue
}

if podResources[podUID] == nil {
podResources[podUID] = &pluginapi.ContainerResources{}
}

for containerName, allocationInfo := range containerEntries {
if podResources[podUID].ContainerResources == nil {
podResources[podUID].ContainerResources = make(map[string]*pluginapi.ResourceAllocation)
}
Expand All @@ -501,13 +542,6 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
}
}

if len(allocationInfosJustFinishRampUp) > 0 {
if err = p.putAllocationsAndAdjustAllocationEntries(allocationInfosJustFinishRampUp, true); err != nil {
// not influencing return response to kubelet when putAllocationsAndAdjustAllocationEntries failed
general.Errorf("putAllocationsAndAdjustAllocationEntries failed with error: %v", err)
}
}

return &pluginapi.GetResourcesAllocationResponse{
PodResources: podResources,
}, nil
Expand Down Expand Up @@ -639,7 +673,8 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
"qosLevel", qosLevel,
"numCPUsInt", reqInt,
"numCPUsFloat64", reqFloat64,
"isDebugPod", isDebugPod)
"isDebugPod", isDebugPod,
"annotation", req.Annotations)

if req.ContainerType == pluginapi.ContainerType_INIT || isDebugPod {
general.Infof("there is no NUMA preference, return nil hint")
Expand Down Expand Up @@ -721,7 +756,8 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
"qosLevel", qosLevel,
"numCPUsInt", reqInt,
"numCPUsFloat64", reqFloat64,
"isDebugPod", isDebugPod)
"isDebugPod", isDebugPod,
"annotations", req.Annotations)

if req.ContainerType == pluginapi.ContainerType_INIT {
return &pluginapi.ResourceAllocationResponse{
Expand Down Expand Up @@ -794,7 +830,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
}()

allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName)
if allocationInfo != nil && allocationInfo.OriginalAllocationResult.Size() >= reqInt {
if allocationInfo != nil && allocationInfo.OriginalAllocationResult.Size() >= reqInt && !util.PodInplaceUpdateResizing(req) {
general.InfoS("already allocated and meet requirement",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
Expand Down Expand Up @@ -1094,22 +1130,117 @@ func (p *DynamicPolicy) getContainerRequestedCores(allocationInfo *state.Allocat
return 0
}

if allocationInfo.RequestQuantity == 0 {
if p.metaServer == nil {
general.Errorf("got nil metaServer")
return 0
if p.metaServer == nil {
general.Errorf("got nil metaServer")
return allocationInfo.RequestQuantity
}

container, err := p.metaServer.GetContainerSpec(allocationInfo.PodUid, allocationInfo.ContainerName)
if err != nil || container == nil {
general.Errorf("get container failed with error: %v", err)
return allocationInfo.RequestQuantity
}

cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests)
metaValue := general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000.0, 0)

// optimize this logic someday:
// only for refresh cpu request for old pod with cpu ceil and old inplace update resized pods.
if state.CheckShared(allocationInfo) {
// if there is these two annotations in memory state, it is a new pod,
// we don't need to check the pod request from podWatcher
if allocationInfo.Annotations[consts.PodAnnotationAggregatedRequestsKey] != "" ||
allocationInfo.Annotations[consts.PodAnnotationInplaceUpdateResizingKey] != "" {
return allocationInfo.RequestQuantity
}
if state.CheckNUMABinding(allocationInfo) {
if metaValue < allocationInfo.RequestQuantity {
general.Infof("[snb] get cpu request quantity: (%.3f->%.3f) for pod: %s/%s container: %s from podWatcher",
allocationInfo.RequestQuantity, metaValue, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
return metaValue
}
} else {
if metaValue != allocationInfo.RequestQuantity {
general.Infof("[share] get cpu request quantity: (%.3f->%.3f) for pod: %s/%s container: %s from podWatcher",
allocationInfo.RequestQuantity, metaValue, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
return metaValue
}
}
} else if allocationInfo.RequestQuantity == 0 {
general.Infof("[other] get cpu request quantity: (%.3f->%.3f) for pod: %s/%s container: %s from podWatcher",
allocationInfo.RequestQuantity, metaValue, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
return metaValue
}

container, err := p.metaServer.GetContainerSpec(allocationInfo.PodUid, allocationInfo.ContainerName)
if err != nil || container == nil {
general.Errorf("get container failed with error: %v", err)
return 0
return allocationInfo.RequestQuantity
}

func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.ResourceRequest) (bool, error) {
_, reqFloat64, err := util.GetPodAggregatedRequestResource(req)
if err != nil {
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
}

shareCoresAllocated := reqFloat64
podEntries := p.state.GetPodEntries()
for podUid, podEntry := range podEntries {
if podEntry.IsPoolEntry() {
continue
}
if podUid == req.PodUid {
continue
}
for _, allocation := range podEntry {
// shareCoresAllocated should involve both main and sidecar containers
if state.CheckShared(allocation) && !state.CheckNUMABinding(allocation) {
shareCoresAllocated += p.getContainerRequestedCores(allocation)
}
}
}

cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests)
allocationInfo.RequestQuantity = general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000.0, 0)
general.Infof("get cpu request quantity: %.3f for pod: %s/%s container: %s from podWatcher",
allocationInfo.RequestQuantity, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)

shareCoresAllocatedInt := int(math.Ceil(shareCoresAllocated))
general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
general.Warningf("[checkNormalShareCoresCpuResource] no enough cpu resource for normal share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)",
req.PodNamespace, req.PodName, req.ContainerName, reqFloat64, shareCoresAllocatedInt, pooledCPUs.Size())
return false, nil
}
return allocationInfo.RequestQuantity

general.InfoS("checkNormalShareCoresCpuResource memory successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
"request", reqFloat64)

return true, nil
}

func (p *DynamicPolicy) applySidecarAllocationInfoFromMainContainer(sidecarAllocationInfo, mainAllocationInfo *state.AllocationInfo) bool {
changed := false
if sidecarAllocationInfo.OwnerPoolName != mainAllocationInfo.OwnerPoolName ||
!sidecarAllocationInfo.AllocationResult.Equals(mainAllocationInfo.AllocationResult) ||
!sidecarAllocationInfo.OriginalAllocationResult.Equals(mainAllocationInfo.OriginalAllocationResult) ||
!state.CheckAllocationInfoTopologyAwareAssignments(sidecarAllocationInfo, mainAllocationInfo) ||
!state.CheckAllocationInfoOriginTopologyAwareAssignments(sidecarAllocationInfo, mainAllocationInfo) {

sidecarAllocationInfo.OwnerPoolName = mainAllocationInfo.OwnerPoolName
sidecarAllocationInfo.AllocationResult = mainAllocationInfo.AllocationResult.Clone()
sidecarAllocationInfo.OriginalAllocationResult = mainAllocationInfo.OriginalAllocationResult.Clone()
sidecarAllocationInfo.TopologyAwareAssignments = machine.DeepcopyCPUAssignment(mainAllocationInfo.TopologyAwareAssignments)
sidecarAllocationInfo.OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(mainAllocationInfo.OriginalTopologyAwareAssignments)

changed = true
}

request := p.getContainerRequestedCores(sidecarAllocationInfo)
if sidecarAllocationInfo.RequestQuantity != request {
sidecarAllocationInfo.RequestQuantity = request
changed = true
}

return changed
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,15 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
}

if newEntries[podUID][containerName] != nil {
// adapt to old checkpoint without RequestQuantity property
newEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)
continue
}

if newEntries[podUID] == nil {
newEntries[podUID] = make(state.ContainerEntries)
}
newEntries[podUID][containerName] = allocationInfo.Clone()
// adapt to old checkpoint without RequestQuantity property
newEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)

switch allocationInfo.QoSLevel {
case consts.PodAnnotationQoSLevelDedicatedCores:
Expand Down
Loading

0 comments on commit c2f518f

Please sign in to comment.