Skip to content

Commit

Permalink
fix(qrm): support topology checking switch for normal share cores pods
Browse files Browse the repository at this point in the history
  • Loading branch information
nightmeng committed Oct 12, 2024
1 parent b32ee5b commit 1f93cf4
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 65 deletions.
21 changes: 13 additions & 8 deletions cmd/katalyst-agent/app/options/qrm/memory_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
)

type MemoryOptions struct {
PolicyName string
ReservedMemoryGB uint64
SkipMemoryStateCorruption bool
EnableSettingMemoryMigrate bool
EnableMemoryAdvisor bool
ExtraControlKnobConfigFile string
EnableOOMPriority bool
OOMPriorityPinnedMapAbsPath string
PolicyName string
ReservedMemoryGB uint64
SkipMemoryStateCorruption bool
EnableSettingMemoryMigrate bool
EnableMemoryAdvisor bool
ExtraControlKnobConfigFile string
EnableOOMPriority bool
OOMPriorityPinnedMapAbsPath string
EnableNonBindingShareCoresMemoryResourceCheck bool

SockMemOptions
LogCacheOptions
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewMemoryOptions() *MemoryOptions {
EnableSettingMemoryMigrate: false,
EnableMemoryAdvisor: false,
EnableOOMPriority: false,
EnableNonBindingShareCoresMemoryResourceCheck: true,
SockMemOptions: SockMemOptions{
EnableSettingSockMem: false,
SetGlobalTCPMemRatio: 20, // default: 20% * {host total memory}
Expand Down Expand Up @@ -105,6 +107,8 @@ func (o *MemoryOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.ExtraControlKnobConfigFile, "the absolute path of extra control knob config file")
fs.BoolVar(&o.EnableOOMPriority, "enable-oom-priority",
o.EnableOOMPriority, "if set true, we will enable oom priority enhancement")
fs.BoolVar(&o.EnableNonBindingShareCoresMemoryResourceCheck, "enable-non-binding-share-cores-memory-resource-check",
o.EnableNonBindingShareCoresMemoryResourceCheck, "enable the topology check for non-binding shares cores pods")
fs.StringVar(&o.OOMPriorityPinnedMapAbsPath, "oom-priority-pinned-bpf-map-path",
o.OOMPriorityPinnedMapAbsPath, "the absolute path of oom priority pinned bpf map")
fs.BoolVar(&o.EnableSettingSockMem, "enable-setting-sockmem",
Expand Down Expand Up @@ -137,6 +141,7 @@ func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error {
conf.EnableMemoryAdvisor = o.EnableMemoryAdvisor
conf.ExtraControlKnobConfigFile = o.ExtraControlKnobConfigFile
conf.EnableOOMPriority = o.EnableOOMPriority
conf.EnableNonBindingShareCoresMemoryResourceCheck = o.EnableNonBindingShareCoresMemoryResourceCheck
conf.OOMPriorityPinnedMapAbsPath = o.OOMPriorityPinnedMapAbsPath
conf.EnableSettingSockMem = o.EnableSettingSockMem
conf.SetGlobalTCPMemRatio = o.SetGlobalTCPMemRatio
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ func (p *DynamicPolicy) getContainerRequestedCores(allocationInfo *state.Allocat
return allocationInfo.RequestQuantity
}

func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.ResourceRequest) (bool, error) {
func (p *DynamicPolicy) checkNonBindingShareCoresCpuResource(req *pluginapi.ResourceRequest) (bool, error) {
_, reqFloat64, err := util.GetPodAggregatedRequestResource(req)
if err != nil {
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
Expand All @@ -1186,14 +1186,14 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource
state.WrapAllocationMetaFilter((*commonstate.AllocationMeta).CheckDedicated),
state.WrapAllocationMetaFilter((*commonstate.AllocationMeta).CheckSharedOrDedicatedNUMABinding))

general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
general.Infof("[checkNonBindingShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
general.Warningf("[checkNormalShareCoresCpuResource] no enough cpu resource for normal share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)",
general.Warningf("[checkNonBindingShareCoresCpuResource] no enough cpu resource for non-binding share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)",
req.PodNamespace, req.PodName, req.ContainerName, reqFloat64, shareCoresAllocatedInt, pooledCPUs.Size())
return false, nil
}

general.InfoS("checkNormalShareCoresCpuResource memory successfully",
general.InfoS("checkNonBindingShareCoresCpuResource memory successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
if allocationInfo.CheckSharedNUMABinding() {
poolEntry.QoSLevel = apiconsts.PodAnnotationQoSLevelSharedCores
// set SharedNUMABinding declarations to pool entry containing SharedNUMABinding containers,
// in order to differentiate them from normal share pools during GetFilteredPoolsCPUSetMap.
// in order to differentiate them from non-binding share cores pools during GetFilteredPoolsCPUSetMap.
poolEntry.Annotations = general.MergeMap(poolEntry.Annotations, map[string]string{
apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (p *DynamicPolicy) calcPoolResizeRequest(originAllocation, allocation *stat
allocation.PodNamespace, allocation.PodName, allocation.ContainerName, originPodAggregatedRequest, podAggregatedRequest)
}

// only support normal share and snb inplace update resize now
// only support share cores inplace update resize now (include non-binding share cores and share cores with NUMA binding)
if allocation.CheckSharedNUMABinding() {
// check snb numa migrate for inplace update resize
originTargetNumaID, err := state.GetSharedNUMABindingTargetNuma(originAllocation)
Expand Down Expand Up @@ -1136,7 +1136,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
if allocationInfo.CheckSharedNUMABinding() {
poolEntry.QoSLevel = apiconsts.PodAnnotationQoSLevelSharedCores
// set SharedNUMABinding declarations to pool entry containing SharedNUMABinding containers,
// in order to differentiate them from normal share pools during GetFilteredPoolsCPUSetMap.
// in order to differentiate them from non-binding share cores pools during GetFilteredPoolsCPUSetMap.
poolEntry.Annotations = general.MergeMap(poolEntry.Annotations, map[string]string{
apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable,
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
return p.sharedCoresWithNUMABindingHintHandler(ctx, req)
}

// TODO: support sidecar follow main container for normal share cores in future
// TODO: support sidecar follow main container for non-binding share cores in future
if req.ContainerType == pluginapi.ContainerType_MAIN {
ok, err := p.checkNormalShareCoresCpuResource(req)
ok, err := p.checkNonBindingShareCoresCpuResource(req)
if err != nil {
general.Errorf("failed to check share cores cpu resource for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
Expand Down Expand Up @@ -775,11 +775,11 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi
allocatableCPUQuantity := machineState[nodeID].GetFilteredDefaultCPUSet(nil, nil).Difference(p.reservedCPUs).Size()

// take this non-binding NUMA for candicate shared_cores with numa_binding,
// won't cause normal shared_cores in short supply
// won't cause non-binding shared_cores in short supply
if nonBindingNUMAsCPUQuantity-allocatableCPUQuantity >= nonBindingSharedRequestedQuantity {
filteredNUMANodes = append(filteredNUMANodes, nodeID)
} else {
general.Infof("filter out NUMA: %d since taking it will cause normal shared_cores in short supply;"+
general.Infof("filter out NUMA: %d since taking it will cause non-binding shared_cores in short supply;"+
" nonBindingNUMAsCPUQuantity: %d, targetNUMAAllocatableCPUQuantity: %d, nonBindingSharedRequestedQuantity: %d",
nodeID, nonBindingNUMAsCPUQuantity, allocatableCPUQuantity, nonBindingSharedRequestedQuantity)
}
Expand Down Expand Up @@ -817,7 +817,7 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(request float64,
}

// if a numa_binding shared_cores has request larger than 1 NUMA,
// its performance may degrade to be like normal shared_cores
// its performance may degrade to be like non-binding shared_cores
if minNUMAsCountNeeded > 1 {
return nil, fmt.Errorf("numa_binding shared_cores container has request larger than 1 NUMA")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (pe PodEntries) GetFilteredPoolsCPUSetMap(ignorePools sets.String) (map[str

// pool entry containing SharedNUMABinding containers also has SharedNUMABinding declarations,
// it's also applicable to isolation pools for SharedNUMABinding containers.
// it helps us to differentiate them from normal share pools when getting targetNUMAID for the pool.
// it helps us to differentiate them from non-binding share cores pools when getting targetNUMAID for the pool.
if allocationInfo.CheckSharedNUMABinding() {
// pool for numa_binding shared_cores containers

Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,11 @@ func TestSNBVPAWithSidecar(t *testing.T) {
as.Nil(err)
}

func TestNormalShareVPA(t *testing.T) {
func TestNonBindingShareCoresVPA(t *testing.T) {
t.Parallel()
as := require.New(t)

tmpDir, err := ioutil.TempDir("", "checkpoint-TestNormalShareVPA")
tmpDir, err := ioutil.TempDir("", "checkpoint-TestNonBindingShareCoresVPA")
as.Nil(err)
defer func() { _ = os.RemoveAll(tmpDir) }()

Expand All @@ -639,7 +639,7 @@ func TestNormalShareVPA(t *testing.T) {

testName := "test"

// test for normal share
// test for non-binding share cores
req := &pluginapi.ResourceRequest{
PodUid: string(uuid.NewUUID()),
PodNamespace: testName,
Expand All @@ -659,7 +659,7 @@ func TestNormalShareVPA(t *testing.T) {
},
}

// no topology hints for normal share
// no topology hints for non-binding share cores
res, err := dynamicPolicy.GetTopologyHints(context.Background(), req)
as.Nil(err)
as.Nil(res.ResourceHints[string(v1.ResourceCPU)])
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestNormalShareVPA(t *testing.T) {

resizeResp1, err := dynamicPolicy.GetTopologyHints(context.Background(), resizeReq1)
as.Nil(err)
// no hints for normal share
// no hints for non-binding share cores
as.Nil(resizeResp1.ResourceHints[string(v1.ResourceCPU)])

_, err = dynamicPolicy.Allocate(context.Background(), resizeReq1)
Expand Down Expand Up @@ -761,11 +761,11 @@ func TestNormalShareVPA(t *testing.T) {
as.Equal(float64(3), allocation.RequestQuantity)
}

func TestNormalShareVPAWithSidecar(t *testing.T) {
func TestNonBindingShareCoresVPAWithSidecar(t *testing.T) {
t.Parallel()
as := require.New(t)

tmpDir, err := ioutil.TempDir("", "checkpoint-TestNormalShareVPAWithSidecar")
tmpDir, err := ioutil.TempDir("", "checkpoint-TestNonBindingShareCoresVPAWithSidecar")
as.Nil(err)
defer func() { _ = os.RemoveAll(tmpDir) }()

Expand Down
13 changes: 8 additions & 5 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type DynamicPolicy struct {

enableEvictingLogCache bool
logCacheEvictionManager logcache.Manager

enableNonBindingShareCoresMemoryResourceCheck bool
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand Down Expand Up @@ -224,6 +226,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
enableOOMPriority: conf.EnableOOMPriority,
oomPriorityMapPinnedPath: conf.OOMPriorityPinnedMapAbsPath,
enableEvictingLogCache: conf.EnableEvictingLogCache,
enableNonBindingShareCoresMemoryResourceCheck: conf.EnableNonBindingShareCoresMemoryResourceCheck,
}

policyImplement.allocationHandlers = map[string]util.AllocationHandler{
Expand Down Expand Up @@ -1052,7 +1055,7 @@ func (p *DynamicPolicy) getContainerRequestedMemoryBytes(allocationInfo *state.A
}
}
} else {
// normal share cores pod
// non-binding share cores pod
requestBytes, err := p.getContainerSpecMemoryRequestBytes(allocationInfo.PodUid, allocationInfo.ContainerName)
if err != nil {
general.Errorf("[other] get container failed with error: %v, return the origin value", err)
Expand Down Expand Up @@ -1121,7 +1124,7 @@ func (p *DynamicPolicy) hasLastLevelEnhancementKey(lastLevelEnhancementKey strin
return false
}

func (p *DynamicPolicy) checkNormalShareCoresResource(req *pluginapi.ResourceRequest) (bool, error) {
func (p *DynamicPolicy) checkNonBindingShareCoresMemoryResource(req *pluginapi.ResourceRequest) (bool, error) {
reqInt, _, err := util.GetPodAggregatedRequestResource(req)
if err != nil {
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
Expand Down Expand Up @@ -1150,14 +1153,14 @@ func (p *DynamicPolicy) checkNormalShareCoresResource(req *pluginapi.ResourceReq
numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable
}

general.Infof("[checkNormalShareCoresResource] node memory allocated: %d, allocatable: %d", shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
general.Infof("[checkNonBindingShareCoresMemoryResource] node memory allocated: %d, allocatable: %d", shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
if shareCoresAllocated > numaAllocatableWithoutNUMABindingPods {
general.Warningf("[checkNormalShareCoresResource] no enough memory resource for normal share cores pod: %s/%s, container: %s (allocated: %d, allocatable: %d)",
general.Warningf("[checkNonBindingShareCoresMemoryResource] no enough memory resource for non-binding share cores pod: %s/%s, container: %s (allocated: %d, allocatable: %d)",
req.PodNamespace, req.PodName, req.ContainerName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
return false, nil
}

general.InfoS("checkNormalShareCoresResource memory successfully",
general.InfoS("checkNonBindingShareCoresMemoryResource memory successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,10 @@ func (p *DynamicPolicy) adjustAllocationEntriesForSharedCores(numaSetChangedCont
}

if !allocationInfo.CheckNUMABinding() {
// update container to target numa set for normal share cores
// update container to target numa set for non-binding share cores
p.updateNUMASetChangedContainers(numaSetChangedContainers, allocationInfo, numaWithoutNUMABindingPods)

// update AggregatedQuantity for normal share cores
// update AggregatedQuantity for non-binding share cores
allocationInfo.AggregatedQuantity = p.getContainerRequestedMemoryBytes(allocationInfo)
} else {
// memory of sidecar in snb pod is belonged to main container so we don't need to adjust it
Expand Down Expand Up @@ -679,7 +679,7 @@ func (p *DynamicPolicy) adjustAllocationEntriesForDedicatedCores(numaSetChangedC

if !allocationInfo.CheckNUMABinding() {
// not to adjust NUMA binding containers
// update container to target numa set for normal share cores
// update container to target numa set for non-binding share cores
p.updateNUMASetChangedContainers(numaSetChangedContainers, allocationInfo, numaWithoutNUMABindingPods)
}
}
Expand Down
30 changes: 16 additions & 14 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,24 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
return p.numaBindingHintHandler(ctx, req)
}

// TODO: support sidecar follow main container for normal share cores in future
// TODO: support sidecar follow main container for non-binding share cores in future
if req.ContainerType == pluginapi.ContainerType_MAIN {
ok, err := p.checkNormalShareCoresResource(req)
if err != nil {
general.Errorf("failed to check share cores resource: %q", err)
return nil, fmt.Errorf("failed to check share cores resource: %q", err)
}
if p.enableNonBindingShareCoresMemoryResourceCheck {
ok, err := p.checkNonBindingShareCoresMemoryResource(req)
if err != nil {
general.Errorf("failed to check share cores resource: %q", err)
return nil, fmt.Errorf("failed to check share cores resource: %q", err)
}

if !ok {
_ = p.emitter.StoreInt64(util.MetricNameShareCoresNoEnoughResourceFailed, 1, metrics.MetricTypeNameCount, metrics.ConvertMapToTags(map[string]string{
"resource": v1.ResourceMemory.String(),
"podNamespace": req.PodNamespace,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, errNoAvailableMemoryHints
if !ok {
_ = p.emitter.StoreInt64(util.MetricNameShareCoresNoEnoughResourceFailed, 1, metrics.MetricTypeNameCount, metrics.ConvertMapToTags(map[string]string{
"resource": v1.ResourceMemory.String(),
"podNamespace": req.PodNamespace,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, errNoAvailableMemoryHints
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi
migratingMemory: make(map[string]map[string]bool),
stopCh: make(chan struct{}),
podDebugAnnoKeys: []string{podDebugAnnoKey},
enableNonBindingShareCoresMemoryResourceCheck: true,
}

policyImplement.allocationHandlers = map[string]util.AllocationHandler{
Expand Down Expand Up @@ -4090,7 +4091,7 @@ func Test_getContainerRequestedMemoryBytes(t *testing.T) {

memorySize1G := resource.MustParse("1Gi")

// check normal share cores
// check non-binding share cores
pod1Allocation := &state.AllocationInfo{
AllocationMeta: commonstate.AllocationMeta{
PodUid: "test-pod-1-uid",
Expand Down Expand Up @@ -4137,16 +4138,16 @@ func Test_getContainerRequestedMemoryBytes(t *testing.T) {
}
metaServer.PodFetcher = podFetcher

// case 2. check normal share cores scale out success
// case 2. check non-binding share cores scale out success
as.Equal(uint64(memorySize1G.Value()), dynamicPolicy.getContainerRequestedMemoryBytes(pod1Allocation))

// case 3. check normal share cores scale in
// case 3. check non-binding share cores scale in
memorySize2G := resource.MustParse("2Gi")
pod1Allocation.AggregatedQuantity = uint64(memorySize2G.Value())
dynamicPolicy.state.SetAllocationInfo(v1.ResourceMemory, "test-pod-1-uid", "test-pod-1", pod1Allocation)
as.Equal(uint64(memorySize1G.Value()), dynamicPolicy.getContainerRequestedMemoryBytes(pod1Allocation))

// check normal snb
// check snb
pod2Allocation := &state.AllocationInfo{
AllocationMeta: commonstate.AllocationMeta{
PodUid: "test-pod-2-uid",
Expand Down
Loading

0 comments on commit 1f93cf4

Please sign in to comment.