From 667b91a0e15b3d4d3a2329c1b251760c61c0e922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=81=A5=E4=BF=9E?= Date: Wed, 17 Jan 2024 20:44:32 +0800 Subject: [PATCH 1/2] perf(qrm): improve performance of calculating hints --- .../cpu/dynamicpolicy/policy_hint_handlers.go | 65 +++--- .../cpu/dynamicpolicy/policy_test.go | 121 ++++++++++ .../cpu/nativepolicy/policy_hint_handlers.go | 4 +- .../dynamicpolicy/policy_hint_handlers.go | 56 +++-- .../memory/dynamicpolicy/policy_test.go | 138 +++++++++++ pkg/agent/qrm-plugins/util/util_test.go | 7 +- pkg/util/machine/bitmask.go | 216 ++++++++++++++++++ pkg/util/machine/topology.go | 46 +++- pkg/util/machine/util.go | 6 +- pkg/util/machine/util_test.go | 4 +- 10 files changed, 592 insertions(+), 71 deletions(-) create mode 100644 pkg/util/machine/bitmask.go diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index 2b8a99273..d8ff5b81d 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" @@ -170,26 +169,52 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) } + numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) + numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) + // because it's hard to control memory allocation accurately, // we only support numa_binding but not exclusive container with request smaller than 1 NUMA - if qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) && - !qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && - minNUMAsCountNeeded > 1 { + if numaBinding && !numaExclusive && minNUMAsCountNeeded > 1 { return nil, fmt.Errorf("NUMA not exclusive binding container has request larger than 1 NUMA") } - numaPerSocket, err := p.machineInfo.NUMAsPerSocket() + numasPerSocket, err := p.machineInfo.NUMAsPerSocket() if err != nil { return nil, fmt.Errorf("NUMAsPerSocket failed with error: %v", err) } - bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) { + numaToAvailableCPUCount := make(map[int]int, len(numaNodes)) + + for _, nodeID := range numaNodes { + if machineState[nodeID] == nil { + general.Warningf("NUMA: %d has nil state", nodeID) + numaToAvailableCPUCount[nodeID] = 0 + continue + } + + if numaExclusive && machineState[nodeID].AllocatedCPUSet.Size() > 0 { + numaToAvailableCPUCount[nodeID] = 0 + general.Warningf("numa_exclusive container skip NUMA: %d allocated: %d", + nodeID, machineState[nodeID].AllocatedCPUSet.Size()) + } else { + numaToAvailableCPUCount[nodeID] = machineState[nodeID].GetAvailableCPUSet(p.reservedCPUs).Size() + } + } + + general.Infof("calculate hints with req: %d, numaToAvailableCPUCount: %+v", + reqInt, numaToAvailableCPUCount) + + numaBound := len(numaNodes) + if numaBound > machine.LargeNUMAsPoint { + // [TODO]: to discuss refine minNUMAsCountNeeded+1 + numaBound = minNUMAsCountNeeded + 1 + } + + machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { return - } else if qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) && - !qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && - maskCount > 1 { + } else if numaBinding && !numaExclusive && maskCount > 1 { // because it's hard to control memory allocation accurately, // we only support numa_binding but not exclusive container with request smaller than 1 NUMA return @@ -198,33 +223,19 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa maskBits := mask.GetBits() numaCountNeeded := mask.Count() - allAvailableCPUsInMask := machine.NewCPUSet() + allAvailableCPUsCountInMask := 0 for _, nodeID := range maskBits { - if machineState[nodeID] == nil { - general.Warningf("NUMA: %d has nil state", nodeID) - return - } else if qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && machineState[nodeID].AllocatedCPUSet.Size() > 0 { - general.Warningf("numa_exclusive container skip mask: %s with NUMA: %d allocated: %d", - mask.String(), nodeID, machineState[nodeID].AllocatedCPUSet.Size()) - return - } - - allAvailableCPUsInMask = allAvailableCPUsInMask.Union(machineState[nodeID].GetAvailableCPUSet(p.reservedCPUs)) + allAvailableCPUsCountInMask += numaToAvailableCPUCount[nodeID] } - if allAvailableCPUsInMask.Size() < reqInt { - general.InfofV(4, "available cpuset: %s of size: %d excluding NUMA binding pods which is smaller than request: %d", - allAvailableCPUsInMask.String(), allAvailableCPUsInMask.Size(), reqInt) + if allAvailableCPUsCountInMask < reqInt { return } crossSockets, err := machine.CheckNUMACrossSockets(maskBits, p.machineInfo.CPUTopology) if err != nil { - general.Errorf("CheckNUMACrossSockets failed with error: %v", err) return - } else if numaCountNeeded <= numaPerSocket && crossSockets { - general.InfofV(4, "needed: %d; min-needed: %d; NUMAs: %v cross sockets with numaPerSocket: %d", - numaCountNeeded, minNUMAsCountNeeded, maskBits, numaPerSocket) + } else if numaCountNeeded <= numasPerSocket && crossSockets { return } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index b5fbfdca5..ac8be7edd 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -33,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" utilfs "k8s.io/kubernetes/pkg/util/filesystem" @@ -59,6 +60,13 @@ const ( podDebugAnnoKey = "qrm.katalyst.kubewharf.io/debug_pod" ) +type cpuTestCase struct { + cpuNum int + socketNum int + numaNum int + fakeNUMANum int +} + func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) { dynamicPolicy, err := getTestDynamicPolicyWithoutInitialization(topology, stateFileDirectory) if err != nil { @@ -4606,3 +4614,116 @@ func TestShoudSharedCoresRampUp(t *testing.T) { as.Equal(false, allocationInfo.RampUp) as.Equal(allocationInfo.OwnerPoolName, state.PoolNameShare) } + +func BenchmarkGetTopologyHints(b *testing.B) { + klog.SetOutput(ioutil.Discard) + klog.V(0) + klog.LogToStderr(false) + cpuCases := []cpuTestCase{ + { + cpuNum: 96, + socketNum: 2, + numaNum: 4, + fakeNUMANum: 4, + }, + //{ + // cpuNum: 128, + // socketNum: 2, + // numaNum: 4, + // fakeNUMANum: 4, + //}, + //{ + // cpuNum: 192, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 8, + //}, + { + cpuNum: 384, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 8, + }, + { + cpuNum: 384, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 16, + }, + { + cpuNum: 384, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 24, + }, + //{ + // cpuNum: 512, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 8, + //}, + //{ + // cpuNum: 512, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 16, + //}, + { + cpuNum: 512, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 32, + }, + } + + testName := "test" + + req := &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{}, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "true"}`, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + } + + for _, tc := range cpuCases { + tmpDir, _ := ioutil.TempDir("", "checkpoint-BenchmarkGetTopologyHints") + + cpuTopology, _ := machine.GenerateDummyCPUTopology(tc.cpuNum, tc.socketNum, tc.fakeNUMANum) + + cpusPerNUMA := cpuTopology.NumCPUs / cpuTopology.NumNUMANodes + + dynamicPolicy, _ := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir) + + for _, numaNeeded := range []int{1, 2, 4} { + req.ResourceRequests[string(v1.ResourceCPU)] = float64(numaNeeded*cpusPerNUMA - 1) + req.Annotations = map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "true"}`, + } + req.Labels = map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + } + + b.Run(fmt.Sprintf("%d cpus, %d sockets, %d NUMAs, %d fake-NUMAs, %d NUMAs package", + tc.cpuNum, tc.socketNum, tc.numaNum, tc.fakeNUMANum, numaNeeded), func(b *testing.B) { + for i := 0; i < b.N; i++ { + dynamicPolicy.GetTopologyHints(context.Background(), req) + } + }) + + } + + _ = os.RemoveAll(tmpDir) + } +} diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_hint_handlers.go index 334984a75..2b313dfd3 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_hint_handlers.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" nativepolicyutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/nativepolicy/util" cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" @@ -112,8 +111,9 @@ func (p *NativePolicy) generateCPUTopologyHints(availableCPUs machine.CPUSet, re }, } + numaNodes := p.machineInfo.CPUDetails.NUMANodes().ToSliceInt() // Iterate through all combinations of numa nodes bitmask and build hints from them. - bitmask.IterateBitMasks(p.machineInfo.CPUDetails.NUMANodes().ToSliceInt(), func(mask bitmask.BitMask) { + machine.IterateBitMasks(numaNodes, len(numaNodes), func(mask machine.BitMask) { // First, update minAffinitySize for the current request size. cpusInMask := p.machineInfo.CPUDetails.CPUsInNUMANodes(mask.GetBits()...).Size() if cpusInMask >= request && mask.Count() < minAffinitySize { diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go index 4e4bf18de..32a5f3278 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" @@ -181,11 +180,12 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat return nil, fmt.Errorf("GetNUMANodesCountToFitMemoryReq failed with error: %v", err) } + numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) + numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) + // because it's hard to control memory allocation accurately, // we only support numa_binding but not exclusive container with request smaller than 1 NUMA - if qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) && - !qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && - minNUMAsCountNeeded > 1 { + if numaBinding && !numaExclusive && minNUMAsCountNeeded > 1 { return nil, fmt.Errorf("NUMA not exclusive binding container has request larger than 1 NUMA") } @@ -194,13 +194,38 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat return nil, fmt.Errorf("NUMAsPerSocket failed with error: %v", err) } - bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) { + numaToFreeMemoryBytes := make(map[int]uint64, len(numaNodes)) + + for _, nodeID := range numaNodes { + if machineState[nodeID] == nil { + general.Warningf("NUMA: %d has nil state", nodeID) + numaToFreeMemoryBytes[nodeID] = 0 + continue + } + + if numaExclusive && machineState[nodeID].Allocated > 0 { + numaToFreeMemoryBytes[nodeID] = 0 + general.Warningf("numa_exclusive container skip NUMA: %d allocated: %d", + nodeID, machineState[nodeID].Allocated) + } else { + numaToFreeMemoryBytes[nodeID] = machineState[nodeID].Free + } + } + + general.Infof("calculate hints with req: %d, numaToFreeMemoryBytes: %+v", + reqInt, numaToFreeMemoryBytes) + + numaBound := len(numaNodes) + if numaBound > machine.LargeNUMAsPoint { + // [TODO]: to discuss refine minNUMAsCountNeeded+1 + numaBound = minNUMAsCountNeeded + 1 + } + + machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { return - } else if qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) && - !qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && - maskCount > 1 { + } else if numaBinding && !numaExclusive && maskCount > 1 { // because it's hard to control memory allocation accurately, // we only support numa_binding but not exclusive container with request smaller than 1 NUMA return @@ -211,30 +236,17 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat var freeBytesInMask uint64 = 0 for _, nodeID := range maskBits { - if machineState[nodeID] == nil { - general.Warningf("NUMA: %d has nil state", nodeID) - return - } else if qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) && machineState[nodeID].Allocated > 0 { - general.Warningf("numa_exclusive container skip mask: %s with NUMA: %d allocated: %d", - mask.String(), nodeID, machineState[nodeID].Allocated) - return - } - - freeBytesInMask += machineState[nodeID].Free + freeBytesInMask += numaToFreeMemoryBytes[nodeID] } if freeBytesInMask < reqInt { - general.InfofV(4, "free bytes: %d in mask are smaller than request bytes: %d", freeBytesInMask, reqInt) return } crossSockets, err := machine.CheckNUMACrossSockets(maskBits, p.topology) if err != nil { - general.Errorf("CheckNUMACrossSockets failed with error: %v", err) return } else if numaCountNeeded <= numaPerSocket && crossSockets { - general.InfofV(4, "needed: %d; min-needed: %d; NUMAs: %v cross sockets with numaPerSocket: %d", - numaCountNeeded, minNUMAsCountNeeded, maskBits, numaPerSocket) return } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 7a25ba1bf..62d31a403 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" maputil "k8s.io/kubernetes/pkg/util/maps" @@ -78,6 +79,14 @@ const ( podDebugAnnoKey = "qrm.katalyst.kubewharf.io/debug_pod" ) +type topoTestCase struct { + cpuNum int + socketNum int + numaNum int + fakeNUMANum int + memGB int +} + var fakeConf = &config.Configuration{ AgentConfiguration: &configagent.AgentConfiguration{ GenericAgentConfiguration: &configagent.GenericAgentConfiguration{ @@ -3570,3 +3579,132 @@ func TestDynamicPolicy_adjustAllocationEntries(t *testing.T) { }) } } + +func BenchmarkGetTopologyHints(b *testing.B) { + klog.SetOutput(ioutil.Discard) + klog.V(0) + klog.LogToStderr(false) + topoCases := []topoTestCase{ + //{ + // cpuNum: 96, + // socketNum: 2, + // numaNum: 4, + // fakeNUMANum: 4, + // memGB: 384, + //}, + //{ + // cpuNum: 128, + // socketNum: 2, + // numaNum: 4, + // fakeNUMANum: 4, + // memGB: 512, + //}, + //{ + // cpuNum: 192, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 8, + // memGB: 768, + //}, + //{ + // cpuNum: 384, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 8, + // memGB: 1536, + //}, + //{ + // cpuNum: 384, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 16, + // memGB: 1536, + //}, + //{ + // cpuNum: 384, + // socketNum: 2, + // numaNum: 8, + // fakeNUMANum: 24, + // memGB: 1536, + //}, + { + cpuNum: 512, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 8, + memGB: 2048, + }, + { + cpuNum: 512, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 16, + memGB: 2048, + }, + { + cpuNum: 512, + socketNum: 2, + numaNum: 8, + fakeNUMANum: 32, + memGB: 2048, + }, + } + + testName := "test" + + req := &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceMemory), + ResourceRequests: map[string]float64{}, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "true"}`, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + } + + for _, tc := range topoCases { + tmpDir, _ := ioutil.TempDir("", "checkpoint-BenchmarkGetTopologyHints") + + cpuTopology, _ := machine.GenerateDummyCPUTopology(tc.cpuNum, tc.socketNum, tc.fakeNUMANum) + + machineInfo, _ := machine.GenerateDummyMachineInfo(tc.fakeNUMANum, tc.memGB) + + memGBPerNUMA := tc.memGB / cpuTopology.NumNUMANodes + + dynamicPolicy, _ := getTestDynamicPolicyWithInitialization(cpuTopology, machineInfo, tmpDir) + + for _, numaNeeded := range []int{1, 2, 4} { + memQuantity := resource.MustParse(fmt.Sprintf("%dGi", memGBPerNUMA)) + req.ResourceRequests[string(v1.ResourceMemory)] = float64(memQuantity.MilliValue())/1000.0 - 1 + req.Annotations = map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "true"}`, + } + req.Labels = map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + } + + b.Run(fmt.Sprintf("%d cpus, %d sockets, %d NUMAs, %d fake-NUMAs, %d NUMAs package", + tc.cpuNum, tc.socketNum, tc.numaNum, tc.fakeNUMANum, numaNeeded), func(b *testing.B) { + for i := 0; i < b.N; i++ { + dynamicPolicy.GetTopologyHints(context.Background(), req) + } + }) + + } + + _ = os.RemoveAll(tmpDir) + } +} + +func TestDummy(t *testing.T) { + fmt.Println("test") +} diff --git a/pkg/agent/qrm-plugins/util/util_test.go b/pkg/agent/qrm-plugins/util/util_test.go index 044e45ab8..23001dfd4 100644 --- a/pkg/agent/qrm-plugins/util/util_test.go +++ b/pkg/agent/qrm-plugins/util/util_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/util/machine" @@ -171,17 +170,17 @@ func TestMaskToUInt64Array(t *testing.T) { as := require.New(t) - nonEmptyMask, err := bitmask.NewBitMask(0, 1, 2, 3) + nonEmptyMask, err := machine.NewBitMask(0, 1, 2, 3) as.Nil(err) testCases := []struct { description string - mask bitmask.BitMask + mask machine.BitMask expectedArray []uint64 }{ { description: "empty mask", - mask: bitmask.NewEmptyBitMask(), + mask: machine.NewEmptyBitMask(), expectedArray: []uint64{}, }, { diff --git a/pkg/util/machine/bitmask.go b/pkg/util/machine/bitmask.go new file mode 100644 index 000000000..1e8601eca --- /dev/null +++ b/pkg/util/machine/bitmask.go @@ -0,0 +1,216 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "fmt" + "math/bits" + "strconv" +) + +// BitMask interface allows hint providers to create BitMasks for TopologyHints +type BitMask interface { + Add(bits ...int) error + Remove(bits ...int) error + And(masks ...BitMask) + Or(masks ...BitMask) + Clear() + Fill() + IsEqual(mask BitMask) bool + IsEmpty() bool + IsSet(bit int) bool + AnySet(bits []int) bool + IsNarrowerThan(mask BitMask) bool + String() string + Count() int + GetBits() []int +} + +type bitMask uint64 + +// NewEmptyBitMask creates a new, empty BitMask +func NewEmptyBitMask() BitMask { + s := bitMask(0) + return &s +} + +// NewBitMask creates a new BitMask +func NewBitMask(bits ...int) (BitMask, error) { + s := bitMask(0) + err := (&s).Add(bits...) + if err != nil { + return nil, err + } + return &s, nil +} + +// Add adds the bits with topology affinity to the BitMask +func (s *bitMask) Add(bits ...int) error { + mask := *s + for _, i := range bits { + if i < 0 || i >= 64 { + return fmt.Errorf("bit number must be in range 0-63") + } + mask |= 1 << uint64(i) + } + *s = mask + return nil +} + +// Remove removes specified bits from BitMask +func (s *bitMask) Remove(bits ...int) error { + mask := *s + for _, i := range bits { + if i < 0 || i >= 64 { + return fmt.Errorf("bit number must be in range 0-63") + } + mask &^= 1 << uint64(i) + } + *s = mask + return nil +} + +// And performs and operation on all bits in masks +func (s *bitMask) And(masks ...BitMask) { + for _, m := range masks { + *s &= *m.(*bitMask) + } +} + +// Or performs or operation on all bits in masks +func (s *bitMask) Or(masks ...BitMask) { + for _, m := range masks { + *s |= *m.(*bitMask) + } +} + +// Clear resets all bits in mask to zero +func (s *bitMask) Clear() { + *s = 0 +} + +// Fill sets all bits in mask to one +func (s *bitMask) Fill() { + *s = bitMask(^uint64(0)) +} + +// IsEmpty checks mask to see if all bits are zero +func (s *bitMask) IsEmpty() bool { + return *s == 0 +} + +// IsSet checks bit in mask to see if bit is set to one +func (s *bitMask) IsSet(bit int) bool { + if bit < 0 || bit >= 64 { + return false + } + return (*s & (1 << uint64(bit))) > 0 +} + +// AnySet checks bit in mask to see if any provided bit is set to one +func (s *bitMask) AnySet(bits []int) bool { + for _, b := range bits { + if s.IsSet(b) { + return true + } + } + return false +} + +// IsEqual checks if masks are equal +func (s *bitMask) IsEqual(mask BitMask) bool { + return *s == *mask.(*bitMask) +} + +// IsNarrowerThan checks if one mask is narrower than another. +// +// A mask is said to be "narrower" than another if it has lets bits set. If the +// same number of bits are set in both masks, then the mask with more +// lower-numbered bits set wins out. +func (s *bitMask) IsNarrowerThan(mask BitMask) bool { + if s.Count() == mask.Count() { + if *s < *mask.(*bitMask) { + return true + } + } + return s.Count() < mask.Count() +} + +// String converts mask to string +func (s *bitMask) String() string { + grouping := 2 + for shift := 64 - grouping; shift > 0; shift -= grouping { + if *s > (1 << uint(shift)) { + return fmt.Sprintf("%0"+strconv.Itoa(shift+grouping)+"b", *s) + } + } + return fmt.Sprintf("%0"+strconv.Itoa(grouping)+"b", *s) +} + +// Count counts number of bits in mask set to one +func (s *bitMask) Count() int { + return bits.OnesCount64(uint64(*s)) +} + +// Getbits returns each bit number with bits set to one +func (s *bitMask) GetBits() []int { + bits := make([]int, 0, 64) + for i := uint64(0); i < 64; i++ { + if (*s & (1 << i)) > 0 { + bits = append(bits, int(i)) + } + } + return bits +} + +// And is a package level implementation of 'and' between first and masks +func And(first BitMask, masks ...BitMask) BitMask { + s := *first.(*bitMask) + s.And(masks...) + return &s +} + +// Or is a package level implementation of 'or' between first and masks +func Or(first BitMask, masks ...BitMask) BitMask { + s := *first.(*bitMask) + s.Or(masks...) + return &s +} + +// IterateBitMasks iterates all possible masks from a list of bits, +// issuing a callback on each mask. +func IterateBitMasks(bits []int, maxBits int, callback func(BitMask)) { + var iterate func(bits, accum []int, size int) + iterate = func(bits, accum []int, size int) { + if len(accum) == size { + mask, _ := NewBitMask(accum...) + callback(mask) + return + } + for i := range bits { + iterate(bits[i+1:], append(accum, bits[i]), size) + } + } + + if len(bits) < maxBits { + maxBits = len(bits) + } + + for i := 1; i <= maxBits; i++ { + iterate(bits, []int{}, i) + } +} diff --git a/pkg/util/machine/topology.go b/pkg/util/machine/topology.go index 84e187bfa..21287f85c 100644 --- a/pkg/util/machine/topology.go +++ b/pkg/util/machine/topology.go @@ -42,11 +42,12 @@ type CPUDetails map[int]CPUInfo // Socket - socket, cadvisor - Socket // NUMA Node - NUMA cell, cadvisor - Node type CPUTopology struct { - NumCPUs int - NumCores int - NumSockets int - NumNUMANodes int - CPUDetails CPUDetails + NumCPUs int + NumCores int + NumSockets int + NumNUMANodes int + NUMANodeIDToSocketID map[int]int + CPUDetails CPUDetails } type MemoryDetails map[int]uint64 @@ -194,6 +195,7 @@ func GenerateDummyCPUTopology(cpuNum, socketNum, numaNum int) (*CPUTopology, err cpuTopology.NumCores = cpuNum / 2 cpuTopology.NumSockets = socketNum cpuTopology.NumNUMANodes = numaNum + cpuTopology.NUMANodeIDToSocketID = make(map[int]int, numaNum) numaPerSocket := numaNum / socketNum cpusPerNUMA := cpuNum / numaNum @@ -212,6 +214,8 @@ func GenerateDummyCPUTopology(cpuNum, socketNum, numaNum int) (*CPUTopology, err SocketID: i, CoreID: k, } + + cpuTopology.NUMANodeIDToSocketID[j] = i } } } @@ -426,6 +430,7 @@ func Discover(machineInfo *info.MachineInfo) (*CPUTopology, *MemoryTopology, err } CPUDetails := CPUDetails{} + numaNodeIDToSocketID := make(map[int]int, len(machineInfo.Topology)) numPhysicalCores := 0 memoryTopology := MemoryTopology{MemoryDetails: map[int]uint64{}} @@ -442,6 +447,8 @@ func Discover(machineInfo *info.MachineInfo) (*CPUTopology, *MemoryTopology, err SocketID: core.SocketID, NUMANodeID: node.Id, } + + numaNodeIDToSocketID[node.Id] = core.SocketID } } else { klog.ErrorS(nil, "Could not get unique coreID for socket", @@ -452,11 +459,12 @@ func Discover(machineInfo *info.MachineInfo) (*CPUTopology, *MemoryTopology, err } return &CPUTopology{ - NumCPUs: machineInfo.NumCores, - NumSockets: machineInfo.NumSockets, - NumCores: numPhysicalCores, - NumNUMANodes: CPUDetails.NUMANodes().Size(), - CPUDetails: CPUDetails, + NumCPUs: machineInfo.NumCores, + NumSockets: machineInfo.NumSockets, + NumCores: numPhysicalCores, + NumNUMANodes: CPUDetails.NUMANodes().Size(), + NUMANodeIDToSocketID: numaNodeIDToSocketID, + CPUDetails: CPUDetails, }, &memoryTopology, nil } @@ -510,7 +518,23 @@ func CheckNUMACrossSockets(numaNodes []int, cpuTopology *CPUTopology) (bool, err if len(numaNodes) <= 1 { return false, nil } - return cpuTopology.CPUDetails.SocketsInNUMANodes(numaNodes...).Size() > 1, nil + + visSocketID := -1 + for _, numaNode := range numaNodes { + socketID, found := cpuTopology.NUMANodeIDToSocketID[numaNode] + + if !found { + return false, fmt.Errorf("no corresponding SocketID for NUMA: %d", numaNode) + } + + if visSocketID != -1 && socketID != visSocketID { + return true, nil + } + + visSocketID = socketID + } + + return false, nil } func GetSiblingNumaInfo(conf *global.MachineInfoConfiguration, diff --git a/pkg/util/machine/util.go b/pkg/util/machine/util.go index 4884511ee..b7e559b6e 100644 --- a/pkg/util/machine/util.go +++ b/pkg/util/machine/util.go @@ -18,8 +18,10 @@ package machine import ( "fmt" +) - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" +const ( + LargeNUMAsPoint = 16 ) // TransformCPUAssignmentFormat transforms cpu assignment string format to cpuset format @@ -94,7 +96,7 @@ func GetCPUAssignmentNUMAs(assignment map[int]CPUSet) CPUSet { } // MaskToUInt64Array transforms bit mask to uint slices -func MaskToUInt64Array(mask bitmask.BitMask) []uint64 { +func MaskToUInt64Array(mask BitMask) []uint64 { maskBits := mask.GetBits() maskBitsUint64 := make([]uint64, 0, len(maskBits)) diff --git a/pkg/util/machine/util_test.go b/pkg/util/machine/util_test.go index 48e97dc04..cc34371b3 100644 --- a/pkg/util/machine/util_test.go +++ b/pkg/util/machine/util_test.go @@ -20,8 +20,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" ) func TestParseCPUAssignmentFormat(t *testing.T) { @@ -50,7 +48,7 @@ func TestDeepcopyCPUAssignment(t *testing.T) { func TestMaskToUInt64Array(t *testing.T) { t.Parallel() - mask, err := bitmask.NewBitMask(0, 1, 2, 3) + mask, err := NewBitMask(0, 1, 2, 3) assert.NoError(t, err) assert.Equal(t, []uint64{0, 1, 2, 3}, MaskToUInt64Array(mask)) } From 6196bbfa3228d676885e3723c0854c6c618c45e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=81=A5=E4=BF=9E?= Date: Mon, 25 Mar 2024 20:08:33 +0800 Subject: [PATCH 2/2] feat(qrm): involve memory bandwidth when admitting in cpu plugin --- .../app/options/qrm/qrm_base.go | 16 +- .../qrm-plugins/cpu/dynamicpolicy/policy.go | 8 +- .../cpu/dynamicpolicy/policy_hint_handlers.go | 255 ++++++++++- .../cpu/dynamicpolicy/policy_test.go | 397 +++++++++++++++++- .../memory/dynamicpolicy/policy.go | 10 +- .../memory/dynamicpolicy/policy_test.go | 4 - .../network/staticpolicy/policy.go | 9 +- pkg/agent/qrm-plugins/util/consts.go | 12 +- pkg/agent/qrm-plugins/util/util.go | 10 +- pkg/config/agent/qrm/qrm_base.go | 9 +- pkg/util/general/common.go | 15 + pkg/util/general/deepcopy.go | 12 + pkg/util/general/deepcopy_test.go | 60 +++ pkg/util/machine/util.go | 1 + 14 files changed, 788 insertions(+), 30 deletions(-) create mode 100644 pkg/util/general/deepcopy_test.go diff --git a/cmd/katalyst-agent/app/options/qrm/qrm_base.go b/cmd/katalyst-agent/app/options/qrm/qrm_base.go index 18824b40a..7bb4feaa1 100644 --- a/cmd/katalyst-agent/app/options/qrm/qrm_base.go +++ b/cmd/katalyst-agent/app/options/qrm/qrm_base.go @@ -28,13 +28,17 @@ type GenericQRMPluginOptions struct { ExtraStateFileAbsPath string PodDebugAnnoKeys []string UseKubeletReservedConfig bool + PodAnnotationKeptKeys []string + PodLabelKeptKeys []string } func NewGenericQRMPluginOptions() *GenericQRMPluginOptions { return &GenericQRMPluginOptions{ - QRMPluginSocketDirs: []string{"/var/lib/kubelet/plugins_registry"}, - StateFileDirectory: "/var/lib/katalyst/qrm_advisor", - PodDebugAnnoKeys: []string{}, + QRMPluginSocketDirs: []string{"/var/lib/kubelet/plugins_registry"}, + StateFileDirectory: "/var/lib/katalyst/qrm_advisor", + PodDebugAnnoKeys: []string{}, + PodAnnotationKeptKeys: []string{}, + PodLabelKeptKeys: []string{}, } } @@ -49,6 +53,10 @@ func (o *GenericQRMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.PodDebugAnnoKeys, "pod annotations keys to identify the pod is a debug pod, and qrm plugins will apply specific strategy to it") fs.BoolVar(&o.UseKubeletReservedConfig, "use-kubelet-reserved-config", o.UseKubeletReservedConfig, "if set true, we will prefer to use kubelet reserved config to reserved resource configuration in katalyst") + fs.StringSliceVar(&o.PodAnnotationKeptKeys, "pod-annotation-kept-keys", + o.PodAnnotationKeptKeys, "pod annotation keys will be kept in qrm state") + fs.StringSliceVar(&o.PodLabelKeptKeys, "pod-label-kept-keys", + o.PodLabelKeptKeys, "pod label keys will be kept in qrm state") } func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfiguration) error { @@ -57,6 +65,8 @@ func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfig conf.ExtraStateFileAbsPath = o.ExtraStateFileAbsPath conf.PodDebugAnnoKeys = o.PodDebugAnnoKeys conf.UseKubeletReservedConfig = o.UseKubeletReservedConfig + conf.PodAnnotationKeptKeys = o.PodAnnotationKeptKeys + conf.PodLabelKeptKeys = o.PodLabelKeptKeys return nil } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index dc5d12d08..b1db02658 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -127,6 +127,8 @@ type DynamicPolicy struct { qosConfig *generic.QoSConfiguration dynamicConfig *dynamicconfig.DynamicAgentConfiguration podDebugAnnoKeys []string + podAnnotationKeptKeys []string + podLabelKeptKeys []string transitionPeriod time.Duration cpuNUMAHintPreferPolicy string cpuNUMAHintPreferLowThreshold float64 @@ -200,6 +202,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration enableCPUIdle: conf.CPUQRMPluginConfig.EnableCPUIdle, reclaimRelativeRootCgroupPath: conf.ReclaimRelativeRootCgroupPath, podDebugAnnoKeys: conf.PodDebugAnnoKeys, + podAnnotationKeptKeys: conf.PodAnnotationKeptKeys, + podLabelKeptKeys: conf.PodLabelKeptKeys, transitionPeriod: 30 * time.Second, } @@ -612,7 +616,7 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context, // we should do it before GetKatalystQoSLevelFromResourceReq. isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys) - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -687,7 +691,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context, // we should do it before GetKatalystQoSLevelFromResourceReq. isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys) - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index d8ff5b81d..fe12eac77 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -23,6 +23,9 @@ import ( "sort" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" @@ -30,6 +33,9 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/spd" + "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos" @@ -131,7 +137,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte if hints == nil { var calculateErr error // calculate hint for container without allocated cpus - hints, calculateErr = p.calculateHints(reqInt, machineState, req.Annotations) + hints, calculateErr = p.calculateHints(reqInt, machineState, req) if calculateErr != nil { return nil, fmt.Errorf("calculateHints failed with error: %v", calculateErr) } @@ -149,9 +155,14 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co // calculateHints is a helper function to calculate the topology hints // with the given container requests. -func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMap, - reqAnnotations map[string]string, +func (p *DynamicPolicy) calculateHints(reqInt int, + machineState state.NUMANodeMap, + req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { + if req == nil { + return nil, fmt.Errorf("nil req in calculateHints") + } + numaNodes := make([]int, 0, len(machineState)) for numaNode := range machineState { numaNodes = append(numaNodes, numaNode) @@ -169,8 +180,8 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) } - numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) - numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) + numaBinding := qosutil.AnnotationsIndicateNUMABinding(req.Annotations) + numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(req.Annotations) // because it's hard to control memory allocation accurately, // we only support numa_binding but not exclusive container with request smaller than 1 NUMA @@ -210,6 +221,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa numaBound = minNUMAsCountNeeded + 1 } + preferredHintIndexes := []int{} machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { @@ -239,15 +251,246 @@ func (p *DynamicPolicy) calculateHints(reqInt int, machineState state.NUMANodeMa return } + preferred := maskCount == minNUMAsCountNeeded hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{ Nodes: machine.MaskToUInt64Array(mask), - Preferred: len(maskBits) == minNUMAsCountNeeded, + Preferred: preferred, }) + + if preferred { + preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1) + } }) + if numaBound > machine.MBWNUMAsPoint { + numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer) + + general.InfoS("getNUMAAllocatedMemBW", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "numaAllocatedMemBW", numaAllocatedMemBW) + + if err != nil { + general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err) + _ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw) + } else { + p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints, + reqInt, numaAllocatedMemBW, req) + } + } + return hints, nil } +func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer) (map[int]int, error) { + numaAllocatedMemBW := make(map[int]int) + podUIDToMemBWReq := make(map[string]int) + podUIDToBindingNUMAs := make(map[string]sets.Int) + + if metaServer == nil { + return nil, fmt.Errorf("getNUMAAllocatedMemBW got nil metaServer") + } + + for numaID, numaState := range machineState { + if numaState == nil { + general.Errorf("numaState is nil, NUMA: %d", numaID) + continue + } + + for _, entries := range numaState.PodEntries { + for _, allocationInfo := range entries { + if !(state.CheckDedicatedNUMABinding(allocationInfo) && allocationInfo.CheckMainContainer()) { + continue + } + + if _, found := podUIDToMemBWReq[allocationInfo.PodUid]; !found { + containerMemoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(metaServer, metav1.ObjectMeta{ + UID: types.UID(allocationInfo.PodUid), + Namespace: allocationInfo.PodNamespace, + Name: allocationInfo.PodName, + Labels: allocationInfo.Labels, + Annotations: allocationInfo.Annotations, + }, int(math.Ceil(state.GetContainerRequestedCores()(allocationInfo)))) + if err != nil { + return nil, fmt.Errorf("GetContainerMemoryBandwidthRequest for pod: %s/%s, container: %s failed with error: %v", + allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err) + } + + podUIDToMemBWReq[allocationInfo.PodUid] = containerMemoryBandwidthRequest + } + + if podUIDToBindingNUMAs[allocationInfo.PodUid] == nil { + podUIDToBindingNUMAs[allocationInfo.PodUid] = sets.NewInt() + } + + podUIDToBindingNUMAs[allocationInfo.PodUid].Insert(numaID) + } + } + } + + for podUID, numaSet := range podUIDToBindingNUMAs { + podMemBWReq, found := podUIDToMemBWReq[podUID] + + if !found { + return nil, fmt.Errorf("pod: %s is found in podUIDToBindingNUMAs, but not found in podUIDToMemBWReq", podUID) + } + + numaCount := numaSet.Len() + + if numaCount == 0 { + continue + } + + perNUMAMemoryBandwidthRequest := podMemBWReq / numaCount + + for _, numaID := range numaSet.UnsortedList() { + numaAllocatedMemBW[numaID] += perNUMAMemoryBandwidthRequest + } + } + + return numaAllocatedMemBW, nil +} + +func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, reqInt int, + numaAllocatedMemBW map[int]int, req *pluginapi.ResourceRequest, +) { + if len(preferredHintIndexes) == 0 { + general.Infof("there is no preferred hints, skip update") + return + } else if req == nil { + general.Errorf("empty req") + return + } + + containerMemoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(p.metaServer, + metav1.ObjectMeta{ + UID: types.UID(req.PodUid), + Namespace: req.PodNamespace, + Name: req.PodName, + Labels: req.Labels, + Annotations: req.Annotations, + }, reqInt) + if err != nil { + general.Errorf("GetContainerMemoryBandwidthRequest failed with error: %v", err) + return + } + + general.InfoS("GetContainerMemoryBandwidthRequest", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerMemoryBandwidthRequest", containerMemoryBandwidthRequest) + + allFalse := true + for _, i := range preferredHintIndexes { + if len(cpuHints[i].Nodes) == 0 { + continue + } + + updated, err := getPreferenceByMemBW(cpuHints[i].Nodes, containerMemoryBandwidthRequest, + numaAllocatedMemBW, p.machineInfo, + p.metaServer, req) + if err != nil { + general.Errorf("getPreferenceByMemBW for hints: %#v failed with error: %v", cpuHints[i].Nodes, err) + _ = p.emitter.StoreInt64(util.MetricNameGetMemBWPreferenceFailed, 1, metrics.MetricTypeNameRaw) + continue + } + + if !updated { + cpuHints[i].Preferred = updated + + general.Infof("update hint: %#v preference to false", cpuHints[i].Nodes) + } else { + allFalse = false + } + } + + if allFalse { + // mem bw check is best-effort, if all preferred hints' preference are updated to false + // we should revert preference of them to true. + // topology mananger will pick the final result after merge all hints. + for _, i := range preferredHintIndexes { + if !cpuHints[i].Preferred { + cpuHints[i].Preferred = true + general.Infof("revert hint: %#v preference to true", cpuHints[i].Nodes) + } + } + } +} + +func getPreferenceByMemBW(targetNUMANodesUInt64 []uint64, + containerMemoryBandwidthRequest int, numaAllocatedMemBW map[int]int, + machineInfo *machine.KatalystMachineInfo, + metaServer *metaserver.MetaServer, req *pluginapi.ResourceRequest, +) (bool, error) { + if req == nil { + return false, fmt.Errorf("empty req") + } else if len(targetNUMANodesUInt64) == 0 { + return false, fmt.Errorf("empty targetNUMANodes") + } else if machineInfo == nil || machineInfo.ExtraTopologyInfo == nil { + return false, fmt.Errorf("invalid machineInfo") + } else if metaServer == nil { + return false, fmt.Errorf("nil metaServer") + } + + targetNUMANodes := make([]int, len(targetNUMANodesUInt64)) + for i, numaID := range targetNUMANodesUInt64 { + var err error + targetNUMANodes[i], err = general.CovertUInt64ToInt(numaID) + if err != nil { + return false, fmt.Errorf("convert NUMA: %d to int failed with error: %v", numaID, err) + } + } + + perNUMAMemoryBandwidthRequest := containerMemoryBandwidthRequest / len(targetNUMANodes) + copiedNUMAAllocatedMemBW := general.DeepCopyIntToIntMap(numaAllocatedMemBW) + + for _, numaID := range targetNUMANodes { + copiedNUMAAllocatedMemBW[numaID] += perNUMAMemoryBandwidthRequest + } + + groupID := 0 + groupNUMAsAllocatedMemBW := make(map[int]int) + groupNUMAsAllocatableMemBW := make(map[int]int) + visNUMAs := sets.NewInt() + + // aggregate each target NUMA and all its sibling NUMAs into a group. + // calculate allocated and allocable memory bandwidth for each group. + // currently, if there is one group whose allocated memory bandwidth is greater than its allocatable memory bandwidth, + // we will set preferrence of the hint to false. + // for the future, we can gather group statistics of each hint, + // and to get the most suitable hint, then set its preferrence to true. + for _, numaID := range targetNUMANodes { + if visNUMAs.Has(numaID) { + continue + } + + groupNUMAsAllocatableMemBW[groupID] += int(machineInfo.ExtraTopologyInfo.SiblingNumaAvgMBWAllocatableMap[numaID]) + groupNUMAsAllocatedMemBW[groupID] += copiedNUMAAllocatedMemBW[numaID] + visNUMAs.Insert(numaID) + for _, siblingNUMAID := range machineInfo.ExtraTopologyInfo.SiblingNumaMap[numaID].UnsortedList() { + groupNUMAsAllocatedMemBW[groupID] += copiedNUMAAllocatedMemBW[siblingNUMAID] + groupNUMAsAllocatableMemBW[groupID] += int(machineInfo.ExtraTopologyInfo.SiblingNumaAvgMBWAllocatableMap[siblingNUMAID]) + visNUMAs.Insert(siblingNUMAID) + } + + general.InfoS("getPreferenceByMemBW", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "groupID", groupID, + "targetNUMANodes", targetNUMANodes, + "groupNUMAsAllocatedMemBW", groupNUMAsAllocatedMemBW[groupID], + "groupNUMAsAllocatableMemBW", groupNUMAsAllocatableMemBW[groupID]) + + if groupNUMAsAllocatedMemBW[groupID] > groupNUMAsAllocatableMemBW[groupID] { + return false, nil + } + + groupID++ + } + + return true, nil +} + func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, req *pluginapi.ResourceRequest, ) (*pluginapi.ResourceHintsResponse, error) { diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index ac8be7edd..0ccb277a3 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "os" "path/filepath" + "reflect" "strings" "testing" "time" @@ -49,6 +50,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metaserver/spd" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" @@ -88,8 +90,10 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st return nil, err } + extraTopologyInfo, _ := machine.GenerateDummyExtraTopology(topology.NumNUMANodes) machineInfo := &machine.KatalystMachineInfo{ - CPUTopology: topology, + CPUTopology: topology, + ExtraTopologyInfo: extraTopologyInfo, } reservedCPUs, _, err := calculator.TakeHTByNUMABalance(machineInfo, machineInfo.CPUDetails.CPUs().Clone(), 2) @@ -131,6 +135,7 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st MetaAgent: &agent.MetaAgent{ PodFetcher: &pod.PodFetcherStub{}, }, + ServiceProfilingManager: &spd.DummyServiceProfilingManager{}, } return policyImplement, nil @@ -938,6 +943,8 @@ func TestGetTopologyHints(t *testing.T) { as := require.New(t) cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) as.Nil(err) + highDensityCPUTopology, err := machine.GenerateDummyCPUTopology(384, 2, 12) + as.Nil(err) testName := "test" @@ -2404,6 +2411,55 @@ func TestGetTopologyHints(t *testing.T) { }, cpuTopology: cpuTopology, }, + { + description: "req for dedicated_cores with numa_binding & numa_exclusive main container in high density machine", + req: &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 380, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "true"}`, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + expectedResp: &pluginapi.ResourceHintsResponse{ + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceCPU), + ResourceHints: map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceCPU): { + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + Preferred: true, + }, + }, + }, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + consts.PodAnnotationMemoryEnhancementNumaExclusive: consts.PodAnnotationMemoryEnhancementNumaExclusiveEnable, + }, + }, + cpuTopology: highDensityCPUTopology, + }, } for _, tc := range testCases { @@ -4727,3 +4783,342 @@ func BenchmarkGetTopologyHints(b *testing.B) { _ = os.RemoveAll(tmpDir) } } + +func Test_getPreferenceByMemBW(t *testing.T) { + t.Parallel() + type args struct { + targetNUMANodesUInt64 []uint64 + containerMemoryBandwidthRequest int + numaAllocatedMemBW map[int]int + machineInfo *machine.KatalystMachineInfo + metaServer *metaserver.MetaServer + req *pluginapi.ResourceRequest + } + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + { + name: "req is nil", + args: args{}, + want: false, + wantErr: true, + }, + { + name: "targetNUMANodesUInt64 is empty", + args: args{ + req: &pluginapi.ResourceRequest{}, + targetNUMANodesUInt64: []uint64{}, + }, + want: false, + wantErr: true, + }, + { + name: "machineInfo is nil", + args: args{ + req: &pluginapi.ResourceRequest{}, + targetNUMANodesUInt64: []uint64{1}, + }, + want: false, + wantErr: true, + }, + { + name: "ExtraTopologyInfo is nil", + args: args{ + req: &pluginapi.ResourceRequest{}, + targetNUMANodesUInt64: []uint64{1}, + machineInfo: &machine.KatalystMachineInfo{}, + }, + want: false, + wantErr: true, + }, + { + name: "metaServer is nil", + args: args{ + req: &pluginapi.ResourceRequest{}, + targetNUMANodesUInt64: []uint64{1}, + machineInfo: &machine.KatalystMachineInfo{ExtraTopologyInfo: &machine.ExtraTopologyInfo{}}, + }, + want: false, + wantErr: true, + }, + } + for _, tt := range tests { + curTT := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := getPreferenceByMemBW(curTT.args.targetNUMANodesUInt64, curTT.args.containerMemoryBandwidthRequest, curTT.args.numaAllocatedMemBW, curTT.args.machineInfo, curTT.args.metaServer, curTT.args.req) + if (err != nil) != curTT.wantErr { + t.Errorf("getPreferenceByMemBW() error = %v, wantErr %v", err, curTT.wantErr) + return + } + if got != curTT.want { + t.Errorf("getPreferenceByMemBW() = %v, want %v", got, curTT.want) + } + }) + } +} + +func Test_getNUMAAllocatedMemBW(t *testing.T) { + t.Parallel() + as := require.New(t) + + policyImplement := &DynamicPolicy{} + state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) + + testName := "test" + highDensityCPUTopology, err := machine.GenerateDummyCPUTopology(384, 2, 12) + as.Nil(err) + podEntries := state.PodEntries{ + "373d08e4-7a6b-4293-aaaf-b135ff812kkk": state.ContainerEntries{ + testName: &state.AllocationInfo{ + PodUid: "373d08e4-7a6b-4293-aaaf-b135ff812kkk", + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + RampUp: false, + OwnerPoolName: state.PoolNameDedicated, + AllocationResult: machine.MustParse("1,8,9"), + OriginalAllocationResult: machine.MustParse("1,8,9"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(1, 8, 9), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(1, 8, 9), + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + QoSLevel: consts.PodAnnotationQoSLevelDedicatedCores, + RequestQuantity: 2, + }, + }, + "373d08e4-7a6b-4293-aaaf-b135ff8123bf": state.ContainerEntries{ + testName: &state.AllocationInfo{ + PodUid: "373d08e4-7a6b-4293-aaaf-b135ff8123bf", + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + RampUp: false, + OwnerPoolName: state.PoolNameShare, + AllocationResult: machine.MustParse("4-5,12,6-7,14"), + OriginalAllocationResult: machine.MustParse("4-5,12,6-7,14"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + RequestQuantity: 2, + }, + }, + "ec6e2f30-c78a-4bc4-9576-c916db5281a3": state.ContainerEntries{ + testName: &state.AllocationInfo{ + PodUid: "ec6e2f30-c78a-4bc4-9576-c916db5281a3", + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + RampUp: false, + OwnerPoolName: state.PoolNameShare, + AllocationResult: machine.MustParse("4-5,12,6-7,14"), + OriginalAllocationResult: machine.MustParse("4-5,12,6-7,14"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + RequestQuantity: 2, + }, + }, + "2432d068-c5a0-46ba-a7bd-b69d9bd16961": state.ContainerEntries{ + testName: &state.AllocationInfo{ + PodUid: "2432d068-c5a0-46ba-a7bd-b69d9bd16961", + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + RampUp: false, + OwnerPoolName: state.PoolNameReclaim, + AllocationResult: machine.MustParse("9,11,13,15"), + OriginalAllocationResult: machine.MustParse("9,11,13,15"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(9), + 1: machine.NewCPUSet(11), + 2: machine.NewCPUSet(13), + 3: machine.NewCPUSet(15), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(9), + 1: machine.NewCPUSet(11), + 2: machine.NewCPUSet(13), + 3: machine.NewCPUSet(15), + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + }, + QoSLevel: consts.PodAnnotationQoSLevelReclaimedCores, + RequestQuantity: 2, + }, + }, + state.PoolNameReclaim: state.ContainerEntries{ + "": &state.AllocationInfo{ + PodUid: state.PoolNameReclaim, + OwnerPoolName: state.PoolNameReclaim, + AllocationResult: machine.MustParse("9,11,13,15"), + OriginalAllocationResult: machine.MustParse("9,11,13,15"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(9), + 1: machine.NewCPUSet(11), + 2: machine.NewCPUSet(13), + 3: machine.NewCPUSet(15), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.NewCPUSet(9), + 1: machine.NewCPUSet(11), + 2: machine.NewCPUSet(13), + 3: machine.NewCPUSet(15), + }, + }, + }, + state.PoolNameShare: state.ContainerEntries{ + "": &state.AllocationInfo{ + PodUid: state.PoolNameShare, + OwnerPoolName: state.PoolNameShare, + AllocationResult: machine.MustParse("4-5,12,6-7,14"), + OriginalAllocationResult: machine.MustParse("4-5,12,6-7,14"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 2: machine.NewCPUSet(4, 5, 12), + 3: machine.NewCPUSet(6, 7, 14), + }, + }, + }, + "share-NUMA1": state.ContainerEntries{ + "": &state.AllocationInfo{ + PodUid: state.PoolNameShare, + OwnerPoolName: state.PoolNameShare, + AllocationResult: machine.MustParse("3,10"), + OriginalAllocationResult: machine.MustParse("3,10"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 1: machine.NewCPUSet(3, 10), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 1: machine.NewCPUSet(3, 10), + }, + Annotations: map[string]string{ + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + "373d08e4-7a6b-4293-aaaf-b135ff812aaa": state.ContainerEntries{ + testName: &state.AllocationInfo{ + PodUid: "373d08e4-7a6b-4293-aaaf-b135ff812aaa", + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + RampUp: false, + OwnerPoolName: "share-NUMA1", + AllocationResult: machine.MustParse("3,10"), + OriginalAllocationResult: machine.MustParse("3,10"), + TopologyAwareAssignments: map[int]machine.CPUSet{ + 1: machine.NewCPUSet(3, 10), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 1: machine.NewCPUSet(3, 10), + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + RequestQuantity: 1, + }, + }, + } + machineState, err := generateMachineStateFromPodEntries(highDensityCPUTopology, podEntries) + as.Nil(err) + + type args struct { + machineState state.NUMANodeMap + metaServer *metaserver.MetaServer + } + tests := []struct { + name string + args args + want map[int]int + wantErr bool + }{ + { + name: "getNUMAAllocatedMemBW normally", + args: args{ + machineState: machineState, + metaServer: &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{}, + }, + ServiceProfilingManager: &spd.DummyServiceProfilingManager{}, + }, + }, + want: map[int]int{0: 0}, + wantErr: false, + }, + } + for _, tt := range tests { + curTT := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer) + if (err != nil) != curTT.wantErr { + t.Errorf("getNUMAAllocatedMemBW() error = %v, wantErr %v", err, curTT.wantErr) + return + } + if !reflect.DeepEqual(got, curTT.want) { + t.Errorf("getNUMAAllocatedMemBW() = %v, want %v", got, curTT.want) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index a40b1d500..602df9518 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -134,7 +134,9 @@ type DynamicPolicy struct { extraStateFileAbsPath string name string - podDebugAnnoKeys []string + podDebugAnnoKeys []string + podAnnotationKeptKeys []string + podLabelKeptKeys []string asyncWorkers *asyncworker.AsyncWorkers // defaultAsyncLimitedWorkers is general workers with default limit. @@ -203,6 +205,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration extraStateFileAbsPath: conf.ExtraStateFileAbsPath, name: fmt.Sprintf("%s_%s", agentName, memconsts.MemoryResourcePluginPolicyNameDynamic), podDebugAnnoKeys: conf.PodDebugAnnoKeys, + podAnnotationKeptKeys: conf.PodAnnotationKeptKeys, + podLabelKeptKeys: conf.PodLabelKeptKeys, asyncWorkers: asyncworker.NewAsyncWorkers(memoryPluginAsyncWorkersName, wrappedEmitter), defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers(memoryPluginAsyncWorkersName, defaultAsyncWorkLimit, wrappedEmitter), enableSettingMemoryMigrate: conf.EnableSettingMemoryMigrate, @@ -468,7 +472,7 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context, // we should do it before GetKatalystQoSLevelFromResourceReq. isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys) - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -734,7 +738,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context, // we should do it before GetKatalystQoSLevelFromResourceReq. isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys) - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 62d31a403..0143bf26d 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -3704,7 +3704,3 @@ func BenchmarkGetTopologyHints(b *testing.B) { _ = os.RemoveAll(tmpDir) } } - -func TestDummy(t *testing.T) { - fmt.Println("test") -} diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go index 53f070616..cc9514068 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go @@ -85,6 +85,9 @@ type StaticPolicy struct { netInterfaceNameResourceAllocationAnnotationKey string netClassIDResourceAllocationAnnotationKey string netBandwidthResourceAllocationAnnotationKey string + + podAnnotationKeptKeys []string + podLabelKeptKeys []string } // NewStaticPolicy returns a static network policy @@ -133,6 +136,8 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, stopCh: make(chan struct{}), name: fmt.Sprintf("%s_%s", agentName, NetworkResourcePluginPolicyNameStatic), qosLevelToNetClassMap: make(map[string]uint32), + podAnnotationKeptKeys: conf.PodAnnotationKeptKeys, + podLabelKeptKeys: conf.PodLabelKeptKeys, } if common.CheckCgroup2UnifiedMode() { @@ -250,7 +255,7 @@ func (p *StaticPolicy) GetTopologyHints(_ context.Context, return nil, fmt.Errorf("GetTopologyHints got nil req") } - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -485,7 +490,7 @@ func (p *StaticPolicy) Allocate(_ context.Context, // we copy original pod annotations here to use them later podAnnotations := maputil.CopySS(req.Annotations) - qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req) + qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) if err != nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) diff --git a/pkg/agent/qrm-plugins/util/consts.go b/pkg/agent/qrm-plugins/util/consts.go index 5fb7a0e4f..5d56c3295 100644 --- a/pkg/agent/qrm-plugins/util/consts.go +++ b/pkg/agent/qrm-plugins/util/consts.go @@ -28,11 +28,13 @@ const ( MetricNameAdvisorUnhealthy = "advisor_unhealthy" // metrics for cpu plugin - MetricNamePoolSize = "pool_size" - MetricNameRealStateInvalid = "real_state_invalid" - MetricNameCPUSetInvalid = "cpuset_invalid" - MetricNameCPUSetOverlap = "cpuset_overlap" - MetricNameOrphanContainer = "orphan_container" + MetricNamePoolSize = "pool_size" + MetricNameRealStateInvalid = "real_state_invalid" + MetricNameCPUSetInvalid = "cpuset_invalid" + MetricNameCPUSetOverlap = "cpuset_overlap" + MetricNameOrphanContainer = "orphan_container" + MetricNameGetMemBWPreferenceFailed = "get_mem_bw_preference_failed" + MetricNameGetNUMAAllocatedMemBWFailed = "get_numa_allocated_mem_bw_failed" // metrics for memory plugin MetricNameMemSetInvalid = "memset_invalid" diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index db0100fdb..6b3de1666 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -71,7 +71,9 @@ func IsDebugPod(podAnnotations map[string]string, podDebugAnnoKeys []string) boo } // GetKatalystQoSLevelFromResourceReq retrieves QoS Level for a given request -func GetKatalystQoSLevelFromResourceReq(qosConf *generic.QoSConfiguration, req *pluginapi.ResourceRequest) (qosLevel string, err error) { +func GetKatalystQoSLevelFromResourceReq(qosConf *generic.QoSConfiguration, req *pluginapi.ResourceRequest, + podAnnotationKeptKeys, podLabelKeptKeys []string, +) (qosLevel string, err error) { if req == nil { err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq got nil resource request") return @@ -89,13 +91,15 @@ func GetKatalystQoSLevelFromResourceReq(qosConf *generic.QoSConfiguration, req * req.Annotations = make(map[string]string) } req.Annotations[apiconsts.PodAnnotationQoSLevelKey] = qosLevel - req.Annotations = qosConf.FilterQoSAndEnhancementMap(req.Annotations) + req.Annotations = general.MergeMap(general.FilterStringToStringMapByKeys(podAnnotationKeptKeys, req.Annotations), + qosConf.FilterQoSAndEnhancementMap(req.Annotations)) if req.Labels == nil { req.Labels = make(map[string]string) } req.Labels[apiconsts.PodAnnotationQoSLevelKey] = qosLevel - req.Labels = qosConf.FilterQoSMap(req.Labels) + req.Labels = general.MergeMap(general.FilterStringToStringMapByKeys(podLabelKeptKeys, req.Labels), + qosConf.FilterQoSMap(req.Labels)) return } diff --git a/pkg/config/agent/qrm/qrm_base.go b/pkg/config/agent/qrm/qrm_base.go index e52ef0f70..84e3bfa64 100644 --- a/pkg/config/agent/qrm/qrm_base.go +++ b/pkg/config/agent/qrm/qrm_base.go @@ -22,6 +22,10 @@ type GenericQRMPluginConfiguration struct { ExtraStateFileAbsPath string PodDebugAnnoKeys []string UseKubeletReservedConfig bool + // PodAnnotationKeptKeys indicates pod annotation keys will be kept in qrm state + PodAnnotationKeptKeys []string + // PodLabelKeptKeys indicates pod label keys will be kept in qrm state + PodLabelKeptKeys []string } type QRMPluginsConfiguration struct { @@ -32,7 +36,10 @@ type QRMPluginsConfiguration struct { } func NewGenericQRMPluginConfiguration() *GenericQRMPluginConfiguration { - return &GenericQRMPluginConfiguration{} + return &GenericQRMPluginConfiguration{ + PodAnnotationKeptKeys: []string{}, + PodLabelKeptKeys: []string{}, + } } func NewQRMPluginsConfiguration() *QRMPluginsConfiguration { diff --git a/pkg/util/general/common.go b/pkg/util/general/common.go index 412f6c311..077fbc8ff 100644 --- a/pkg/util/general/common.go +++ b/pkg/util/general/common.go @@ -299,6 +299,21 @@ func MergeMapInt(src, override map[string]int) map[string]int { return res } +// FilterPodAnnotationKeptKeys filter keys kept in qrm state +func FilterStringToStringMapByKeys(keptKeys []string, originalMap map[string]string) map[string]string { + if originalMap == nil { + return nil + } + + filteredMap := make(map[string]string) + for _, key := range keptKeys { + if val, ok := originalMap[key]; ok { + filteredMap[key] = val + } + } + return filteredMap +} + // GetSortedMapKeys returns a slice containing sorted keys for the given map func GetSortedMapKeys(m map[string]int) []string { ret := make([]string, 0, len(m)) diff --git a/pkg/util/general/deepcopy.go b/pkg/util/general/deepcopy.go index fda393d86..69c3d886d 100644 --- a/pkg/util/general/deepcopy.go +++ b/pkg/util/general/deepcopy.go @@ -51,3 +51,15 @@ func DeepCopyIntMap(origin map[string]int) map[string]int { } return res } + +func DeepCopyIntToIntMap(origin map[int]int) map[int]int { + if origin == nil { + return nil + } + + res := make(map[int]int, len(origin)) + for key, val := range origin { + res[key] = val + } + return res +} diff --git a/pkg/util/general/deepcopy_test.go b/pkg/util/general/deepcopy_test.go new file mode 100644 index 000000000..0da1289b6 --- /dev/null +++ b/pkg/util/general/deepcopy_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package general + +import ( + "reflect" + "testing" +) + +func TestDeepCopyIntToIntMap(t *testing.T) { + t.Parallel() + type args struct { + origin map[int]int + } + tests := []struct { + name string + args args + want map[int]int + }{ + { + name: "deep copy int to int map normally", + args: args{ + origin: map[int]int{1: 1}, + }, + want: map[int]int{ + 1: 1, + }, + }, + { + name: "deep copy nil int to int map", + args: args{ + origin: nil, + }, + want: nil, + }, + } + for _, tt := range tests { + curTT := tt + t.Run(curTT.name, func(t *testing.T) { + t.Parallel() + if got := DeepCopyIntToIntMap(curTT.args.origin); !reflect.DeepEqual(got, curTT.want) { + t.Errorf("DeepCopyIntToIntMap() = %v, want %v", got, curTT.want) + } + }) + } +} diff --git a/pkg/util/machine/util.go b/pkg/util/machine/util.go index b7e559b6e..19bf8a07b 100644 --- a/pkg/util/machine/util.go +++ b/pkg/util/machine/util.go @@ -22,6 +22,7 @@ import ( const ( LargeNUMAsPoint = 16 + MBWNUMAsPoint = 8 ) // TransformCPUAssignmentFormat transforms cpu assignment string format to cpuset format