Skip to content

Commit

Permalink
feat(qrm): involve memory bandwidth when admitting in cpu plugin
Browse files Browse the repository at this point in the history
csfldf committed Jul 23, 2024
1 parent 667b91a commit 6196bbf
Showing 14 changed files with 788 additions and 30 deletions.
16 changes: 13 additions & 3 deletions cmd/katalyst-agent/app/options/qrm/qrm_base.go
Original file line number Diff line number Diff line change
@@ -28,13 +28,17 @@ type GenericQRMPluginOptions struct {
ExtraStateFileAbsPath string
PodDebugAnnoKeys []string
UseKubeletReservedConfig bool
PodAnnotationKeptKeys []string
PodLabelKeptKeys []string
}

func NewGenericQRMPluginOptions() *GenericQRMPluginOptions {
return &GenericQRMPluginOptions{
QRMPluginSocketDirs: []string{"/var/lib/kubelet/plugins_registry"},
StateFileDirectory: "/var/lib/katalyst/qrm_advisor",
PodDebugAnnoKeys: []string{},
QRMPluginSocketDirs: []string{"/var/lib/kubelet/plugins_registry"},
StateFileDirectory: "/var/lib/katalyst/qrm_advisor",
PodDebugAnnoKeys: []string{},
PodAnnotationKeptKeys: []string{},
PodLabelKeptKeys: []string{},
}
}

@@ -49,6 +53,10 @@ func (o *GenericQRMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.PodDebugAnnoKeys, "pod annotations keys to identify the pod is a debug pod, and qrm plugins will apply specific strategy to it")
fs.BoolVar(&o.UseKubeletReservedConfig, "use-kubelet-reserved-config",
o.UseKubeletReservedConfig, "if set true, we will prefer to use kubelet reserved config to reserved resource configuration in katalyst")
fs.StringSliceVar(&o.PodAnnotationKeptKeys, "pod-annotation-kept-keys",
o.PodAnnotationKeptKeys, "pod annotation keys will be kept in qrm state")
fs.StringSliceVar(&o.PodLabelKeptKeys, "pod-label-kept-keys",
o.PodLabelKeptKeys, "pod label keys will be kept in qrm state")
}

func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfiguration) error {
@@ -57,6 +65,8 @@ func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfig
conf.ExtraStateFileAbsPath = o.ExtraStateFileAbsPath
conf.PodDebugAnnoKeys = o.PodDebugAnnoKeys
conf.UseKubeletReservedConfig = o.UseKubeletReservedConfig
conf.PodAnnotationKeptKeys = o.PodAnnotationKeptKeys
conf.PodLabelKeptKeys = o.PodLabelKeptKeys
return nil
}

8 changes: 6 additions & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
@@ -127,6 +127,8 @@ type DynamicPolicy struct {
qosConfig *generic.QoSConfiguration
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
podDebugAnnoKeys []string
podAnnotationKeptKeys []string
podLabelKeptKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
@@ -200,6 +202,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
enableCPUIdle: conf.CPUQRMPluginConfig.EnableCPUIdle,
reclaimRelativeRootCgroupPath: conf.ReclaimRelativeRootCgroupPath,
podDebugAnnoKeys: conf.PodDebugAnnoKeys,
podAnnotationKeptKeys: conf.PodAnnotationKeptKeys,
podLabelKeptKeys: conf.PodLabelKeptKeys,
transitionPeriod: 30 * time.Second,
}

@@ -612,7 +616,7 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
// we should do it before GetKatalystQoSLevelFromResourceReq.
isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys)

qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req)
qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys)
if err != nil {
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
@@ -687,7 +691,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
// we should do it before GetKatalystQoSLevelFromResourceReq.
isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys)

qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req)
qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys)
if err != nil {
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
255 changes: 249 additions & 6 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
@@ -23,13 +23,19 @@ import (
"sort"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
@@ -131,7 +137,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte
if hints == nil {
var calculateErr error
// calculate hint for container without allocated cpus
hints, calculateErr = p.calculateHints(reqInt, machineState, req.Annotations)
hints, calculateErr = p.calculateHints(reqInt, machineState, req)
if calculateErr != nil {
return nil, fmt.Errorf("calculateHints failed with error: %v", calculateErr)
}
@@ -149,9 +155,14 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co

// calculateHints is a helper function to calculate the topology hints
// with the given container requests.
func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMap,
reqAnnotations map[string]string,
func (p *DynamicPolicy) calculateHints(reqInt int,
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
if req == nil {
return nil, fmt.Errorf("nil req in calculateHints")
}

numaNodes := make([]int, 0, len(machineState))
for numaNode := range machineState {
numaNodes = append(numaNodes, numaNode)
@@ -169,8 +180,8 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
}

numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations)
numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations)
numaBinding := qosutil.AnnotationsIndicateNUMABinding(req.Annotations)
numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(req.Annotations)

// because it's hard to control memory allocation accurately,
// we only support numa_binding but not exclusive container with request smaller than 1 NUMA
@@ -210,6 +221,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa
numaBound = minNUMAsCountNeeded + 1
}

preferredHintIndexes := []int{}
machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) {
maskCount := mask.Count()
if maskCount < minNUMAsCountNeeded {
@@ -239,15 +251,246 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa
return
}

preferred := maskCount == minNUMAsCountNeeded
hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{
Nodes: machine.MaskToUInt64Array(mask),
Preferred: len(maskBits) == minNUMAsCountNeeded,
Preferred: preferred,
})

if preferred {
preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1)
}
})

if numaBound > machine.MBWNUMAsPoint {
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer)

general.InfoS("getNUMAAllocatedMemBW",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"numaAllocatedMemBW", numaAllocatedMemBW)

if err != nil {
general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err)
_ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw)
} else {
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints,
reqInt, numaAllocatedMemBW, req)
}
}

return hints, nil
}

func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer) (map[int]int, error) {
numaAllocatedMemBW := make(map[int]int)
podUIDToMemBWReq := make(map[string]int)
podUIDToBindingNUMAs := make(map[string]sets.Int)

if metaServer == nil {
return nil, fmt.Errorf("getNUMAAllocatedMemBW got nil metaServer")
}

for numaID, numaState := range machineState {
if numaState == nil {
general.Errorf("numaState is nil, NUMA: %d", numaID)
continue
}

for _, entries := range numaState.PodEntries {
for _, allocationInfo := range entries {
if !(state.CheckDedicatedNUMABinding(allocationInfo) && allocationInfo.CheckMainContainer()) {
continue
}

if _, found := podUIDToMemBWReq[allocationInfo.PodUid]; !found {
containerMemoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(metaServer, metav1.ObjectMeta{
UID: types.UID(allocationInfo.PodUid),
Namespace: allocationInfo.PodNamespace,
Name: allocationInfo.PodName,
Labels: allocationInfo.Labels,
Annotations: allocationInfo.Annotations,
}, int(math.Ceil(state.GetContainerRequestedCores()(allocationInfo))))
if err != nil {
return nil, fmt.Errorf("GetContainerMemoryBandwidthRequest for pod: %s/%s, container: %s failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err)
}

podUIDToMemBWReq[allocationInfo.PodUid] = containerMemoryBandwidthRequest
}

if podUIDToBindingNUMAs[allocationInfo.PodUid] == nil {
podUIDToBindingNUMAs[allocationInfo.PodUid] = sets.NewInt()
}

podUIDToBindingNUMAs[allocationInfo.PodUid].Insert(numaID)
}
}
}

for podUID, numaSet := range podUIDToBindingNUMAs {
podMemBWReq, found := podUIDToMemBWReq[podUID]

if !found {
return nil, fmt.Errorf("pod: %s is found in podUIDToBindingNUMAs, but not found in podUIDToMemBWReq", podUID)
}

numaCount := numaSet.Len()

if numaCount == 0 {
continue
}

perNUMAMemoryBandwidthRequest := podMemBWReq / numaCount

for _, numaID := range numaSet.UnsortedList() {
numaAllocatedMemBW[numaID] += perNUMAMemoryBandwidthRequest
}
}

return numaAllocatedMemBW, nil
}

func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, reqInt int,
numaAllocatedMemBW map[int]int, req *pluginapi.ResourceRequest,
) {
if len(preferredHintIndexes) == 0 {
general.Infof("there is no preferred hints, skip update")
return
} else if req == nil {
general.Errorf("empty req")
return
}

containerMemoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(p.metaServer,
metav1.ObjectMeta{
UID: types.UID(req.PodUid),
Namespace: req.PodNamespace,
Name: req.PodName,
Labels: req.Labels,
Annotations: req.Annotations,
}, reqInt)
if err != nil {
general.Errorf("GetContainerMemoryBandwidthRequest failed with error: %v", err)
return
}

general.InfoS("GetContainerMemoryBandwidthRequest",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerMemoryBandwidthRequest", containerMemoryBandwidthRequest)

allFalse := true
for _, i := range preferredHintIndexes {
if len(cpuHints[i].Nodes) == 0 {
continue
}

updated, err := getPreferenceByMemBW(cpuHints[i].Nodes, containerMemoryBandwidthRequest,
numaAllocatedMemBW, p.machineInfo,
p.metaServer, req)
if err != nil {
general.Errorf("getPreferenceByMemBW for hints: %#v failed with error: %v", cpuHints[i].Nodes, err)
_ = p.emitter.StoreInt64(util.MetricNameGetMemBWPreferenceFailed, 1, metrics.MetricTypeNameRaw)
continue
}

if !updated {
cpuHints[i].Preferred = updated

general.Infof("update hint: %#v preference to false", cpuHints[i].Nodes)
} else {
allFalse = false
}
}

if allFalse {
// mem bw check is best-effort, if all preferred hints' preference are updated to false
// we should revert preference of them to true.
// topology mananger will pick the final result after merge all hints.
for _, i := range preferredHintIndexes {
if !cpuHints[i].Preferred {
cpuHints[i].Preferred = true
general.Infof("revert hint: %#v preference to true", cpuHints[i].Nodes)
}
}
}
}

func getPreferenceByMemBW(targetNUMANodesUInt64 []uint64,
containerMemoryBandwidthRequest int, numaAllocatedMemBW map[int]int,
machineInfo *machine.KatalystMachineInfo,
metaServer *metaserver.MetaServer, req *pluginapi.ResourceRequest,
) (bool, error) {
if req == nil {
return false, fmt.Errorf("empty req")
} else if len(targetNUMANodesUInt64) == 0 {
return false, fmt.Errorf("empty targetNUMANodes")
} else if machineInfo == nil || machineInfo.ExtraTopologyInfo == nil {
return false, fmt.Errorf("invalid machineInfo")
} else if metaServer == nil {
return false, fmt.Errorf("nil metaServer")
}

targetNUMANodes := make([]int, len(targetNUMANodesUInt64))
for i, numaID := range targetNUMANodesUInt64 {
var err error
targetNUMANodes[i], err = general.CovertUInt64ToInt(numaID)
if err != nil {
return false, fmt.Errorf("convert NUMA: %d to int failed with error: %v", numaID, err)
}
}

perNUMAMemoryBandwidthRequest := containerMemoryBandwidthRequest / len(targetNUMANodes)
copiedNUMAAllocatedMemBW := general.DeepCopyIntToIntMap(numaAllocatedMemBW)

for _, numaID := range targetNUMANodes {
copiedNUMAAllocatedMemBW[numaID] += perNUMAMemoryBandwidthRequest
}

groupID := 0
groupNUMAsAllocatedMemBW := make(map[int]int)
groupNUMAsAllocatableMemBW := make(map[int]int)
visNUMAs := sets.NewInt()

// aggregate each target NUMA and all its sibling NUMAs into a group.
// calculate allocated and allocable memory bandwidth for each group.
// currently, if there is one group whose allocated memory bandwidth is greater than its allocatable memory bandwidth,
// we will set preferrence of the hint to false.
// for the future, we can gather group statistics of each hint,
// and to get the most suitable hint, then set its preferrence to true.
for _, numaID := range targetNUMANodes {
if visNUMAs.Has(numaID) {
continue
}

groupNUMAsAllocatableMemBW[groupID] += int(machineInfo.ExtraTopologyInfo.SiblingNumaAvgMBWAllocatableMap[numaID])
groupNUMAsAllocatedMemBW[groupID] += copiedNUMAAllocatedMemBW[numaID]
visNUMAs.Insert(numaID)
for _, siblingNUMAID := range machineInfo.ExtraTopologyInfo.SiblingNumaMap[numaID].UnsortedList() {
groupNUMAsAllocatedMemBW[groupID] += copiedNUMAAllocatedMemBW[siblingNUMAID]
groupNUMAsAllocatableMemBW[groupID] += int(machineInfo.ExtraTopologyInfo.SiblingNumaAvgMBWAllocatableMap[siblingNUMAID])
visNUMAs.Insert(siblingNUMAID)
}

general.InfoS("getPreferenceByMemBW",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"groupID", groupID,
"targetNUMANodes", targetNUMANodes,
"groupNUMAsAllocatedMemBW", groupNUMAsAllocatedMemBW[groupID],
"groupNUMAsAllocatableMemBW", groupNUMAsAllocatableMemBW[groupID])

if groupNUMAsAllocatedMemBW[groupID] > groupNUMAsAllocatableMemBW[groupID] {
return false, nil
}

groupID++
}

return true, nil
}

func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
req *pluginapi.ResourceRequest,
) (*pluginapi.ResourceHintsResponse, error) {
Loading

0 comments on commit 6196bbf

Please sign in to comment.