diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_base.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_base.go new file mode 100644 index 000000000..7e844088a --- /dev/null +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_base.go @@ -0,0 +1,44 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroom + +import ( + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/errors" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom" +) + +type MemoryHeadroomPolicyOptions struct { + MemoryPolicyCanonicalOptions *MemoryPolicyCanonicalOptions +} + +func NewMemoryHeadroomPolicyOptions() *MemoryHeadroomPolicyOptions { + return &MemoryHeadroomPolicyOptions{ + MemoryPolicyCanonicalOptions: NewMemoryPolicyCanonicalOptions(), + } +} + +func (o *MemoryHeadroomPolicyOptions) AddFlags(fs *pflag.FlagSet) { + o.MemoryPolicyCanonicalOptions.AddFlags(fs) +} + +func (o *MemoryHeadroomPolicyOptions) ApplyTo(c *headroom.MemoryHeadroomPolicyConfiguration) error { + var errList []error + errList = append(errList, o.MemoryPolicyCanonicalOptions.ApplyTo(c.MemoryPolicyCanonicalConfiguration)) + return errors.NewAggregate(errList) +} diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go new file mode 100644 index 000000000..94b7b4847 --- /dev/null +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go @@ -0,0 +1,85 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroom + +import ( + "github.com/spf13/pflag" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom" +) + +const ( + defaultEnabled = false + defaultFreeBasedRatio = 0.6 + defaultStaticBasedCapacity = 20 << 30 // 20GB + + defaultCacheBaseRatio = 0 + defaultCPUMemRatioLowerBound = 1. / 6. + defaultCPUMemRatioUpperBound = 1. / 3.5 +) + +type MemoryPolicyCanonicalOptions struct { + *MemoryUtilBasedOptions +} + +type MemoryUtilBasedOptions struct { + Enabled bool + FreeBasedRatio float64 + CacheBasedRatio float64 + StaticBasedCapacity float64 + + CPUMemRatioLowerBound float64 + CPUMemRatioUpperBound float64 +} + +func NewMemoryPolicyCanonicalOptions() *MemoryPolicyCanonicalOptions { + return &MemoryPolicyCanonicalOptions{ + MemoryUtilBasedOptions: &MemoryUtilBasedOptions{ + Enabled: defaultEnabled, + FreeBasedRatio: defaultFreeBasedRatio, + StaticBasedCapacity: defaultStaticBasedCapacity, + CacheBasedRatio: defaultCacheBaseRatio, + CPUMemRatioLowerBound: defaultCPUMemRatioLowerBound, + CPUMemRatioUpperBound: defaultCPUMemRatioUpperBound, + }, + } +} + +func (o *MemoryPolicyCanonicalOptions) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.Enabled, "memory-headroom-policy-canonical-util-based-enabled", o.Enabled, + "the flag to enable memory buffer") + fs.Float64Var(&o.FreeBasedRatio, "memory-headroom-policy-canonical-free-based-ratio", o.FreeBasedRatio, + "the estimation of free memory utilization, which can be used as system buffer to oversold memory") + fs.Float64Var(&o.StaticBasedCapacity, "memory-headroom-policy-canonical-static-based-capacity", o.StaticBasedCapacity, + "the static oversold memory size by bytes") + fs.Float64Var(&o.CacheBasedRatio, "memory-headroom-policy-canonical-cache-based-ratio", o.CacheBasedRatio, + "the rate of cache oversold, 0 means disable cache oversold") + fs.Float64Var(&o.CPUMemRatioLowerBound, "memory-headroom-policy-canonical-cpu-mem-ratio-lower-bound", o.CPUMemRatioLowerBound, + "the upper bound of memory to cpu ratio for enabling cache oversold") + fs.Float64Var(&o.CPUMemRatioUpperBound, "memory-headroom-policy-canonical-cpu-mem-ratio-upper-bound", o.CPUMemRatioUpperBound, + "the lower bound of memory to cpu ratio for enabling cache oversold") +} + +func (o *MemoryPolicyCanonicalOptions) ApplyTo(c *headroom.MemoryPolicyCanonicalConfiguration) error { + c.Enabled = o.Enabled + c.FreeBasedRatio = o.FreeBasedRatio + c.StaticBasedCapacity = o.StaticBasedCapacity + c.CacheBasedRatio = o.CacheBasedRatio + c.CPUMemRatioLowerBound = o.CPUMemRatioLowerBound + c.CPUMemRatioUpperBound = o.CPUMemRatioUpperBound + return nil +} diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/memory_advisor.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/memory_advisor.go index 14c198063..fd72b7b3d 100644 --- a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/memory_advisor.go +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/memory_advisor.go @@ -18,7 +18,9 @@ package memory import ( "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/errors" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/memory/headroom" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory" ) @@ -26,12 +28,15 @@ import ( // MemoryAdvisorOptions holds the configurations for memory advisor in qos aware plugin type MemoryAdvisorOptions struct { MemoryHeadroomPolicyPriority []string + + *headroom.MemoryHeadroomPolicyOptions } // NewMemoryAdvisorOptions creates a new Options with a default config func NewMemoryAdvisorOptions() *MemoryAdvisorOptions { return &MemoryAdvisorOptions{ MemoryHeadroomPolicyPriority: []string{string(types.MemoryHeadroomPolicyCanonical)}, + MemoryHeadroomPolicyOptions: headroom.NewMemoryHeadroomPolicyOptions(), } } @@ -39,6 +44,7 @@ func NewMemoryAdvisorOptions() *MemoryAdvisorOptions { func (o *MemoryAdvisorOptions) AddFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&o.MemoryHeadroomPolicyPriority, "memory-headroom-policy-priority", o.MemoryHeadroomPolicyPriority, "policy memory advisor to estimate resource headroom, sorted by priority descending order, should be formatted as 'policy1,policy2'") + o.MemoryHeadroomPolicyOptions.AddFlags(fs) } // ApplyTo fills up config with options @@ -46,5 +52,8 @@ func (o *MemoryAdvisorOptions) ApplyTo(c *memory.MemoryAdvisorConfiguration) err for _, policy := range o.MemoryHeadroomPolicyPriority { c.MemoryHeadroomPolicies = append(c.MemoryHeadroomPolicies, types.MemoryHeadroomPolicyName(policy)) } - return nil + + var errList []error + errList = append(errList, o.MemoryHeadroomPolicyOptions.ApplyTo(c.MemoryHeadroomPolicyConfiguration)) + return errors.NewAggregate(errList) } diff --git a/pkg/agent/resourcemanager/reporter/manager_test.go b/pkg/agent/resourcemanager/reporter/manager_test.go index 5fd19abb8..275e80e09 100644 --- a/pkg/agent/resourcemanager/reporter/manager_test.go +++ b/pkg/agent/resourcemanager/reporter/manager_test.go @@ -274,13 +274,6 @@ func Test_managerImpl_convertReportFieldsIfNeeded(t *testing.T) { Value: []byte("Value_a"), }, }, - testGroupVersionKindSecond: { - { - FieldType: v1alpha1.FieldType_Spec, - FieldName: "fieldName_b", - Value: []byte("Value_b"), - }, - }, }, }, want: map[v1.GroupVersionKind][]*v1alpha1.ReportField{ @@ -290,11 +283,6 @@ func Test_managerImpl_convertReportFieldsIfNeeded(t *testing.T) { FieldName: "fieldName_a", Value: []byte("Value_a"), }, - { - FieldType: v1alpha1.FieldType_Spec, - FieldName: "fieldName_b", - Value: []byte("Value_b"), - }, }, }, }, diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/provisionpolicy/policy_canonical.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/provisionpolicy/policy_canonical.go index 0b40f959b..145edf0cc 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/provisionpolicy/policy_canonical.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/provisionpolicy/policy_canonical.go @@ -17,7 +17,6 @@ limitations under the License. package provisionpolicy import ( - v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" @@ -51,7 +50,7 @@ func (p *PolicyCanonical) estimationCPUUsage() (cpuEstimation float64, container continue } - containerEstimation, err := helper.EstimateContainerResourceUsage(ci, v1.ResourceCPU, p.metaReader, p.essentials.EnableReclaim) + containerEstimation, err := helper.EstimateContainerCPUUsage(ci, p.metaReader, p.essentials.EnableReclaim) if err != nil { return 0, 0, err } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/estimation_canonical.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/estimation_canonical.go index 2bbae363d..467b5b03e 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/estimation_canonical.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/estimation_canonical.go @@ -19,13 +19,11 @@ package helper import ( "fmt" - v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" - apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) const ( @@ -36,48 +34,57 @@ const ( metricFallback string = "fallback" ) -// estimationFallbackValues is the resource estimation value when all methods fail -var estimationFallbackValues = map[v1.ResourceName]float64{ - v1.ResourceCPU: 4.0, - v1.ResourceMemory: 8 << 30, -} +const ( + estimationCPUFallbackValue = 4.0 + estimationMemoryFallbackValue float64 = 8 << 30 -// resourceMetricsToGather are the interested metrics for resource estimation -var resourceMetricsToGather = map[v1.ResourceName][]string{ - v1.ResourceCPU: { + estimationSharedDedicateQoSContainerBufferRatio = 1.1 + estimationSystemQoSContainerBufferRatio = 1.0 +) + +var ( + cpuMetricsToGather = []string{ consts.MetricCPUUsageContainer, consts.MetricLoad1MinContainer, consts.MetricLoad5MinContainer, - }, - v1.ResourceMemory: { + } + + memoryMetricsToGatherForSharedAndDedicatedQoS = []string{ consts.MetricMemRssContainer, - }, -} + consts.MetricMemCacheContainer, + consts.MetricMemShmemContainer, + } + + memoryMetricsToGatherForSystemQoS = []string{ + consts.MetricMemRssContainer, + } +) + +// EstimateContainerCPUUsage used to estimate non-reclaimed pods CPU usage. +// If reclaimEnable is true, it will estimate reclaimed pods CPU usage. +func EstimateContainerCPUUsage(ci *types.ContainerInfo, metaReader metacache.MetaReader, reclaimEnable bool) (float64, error) { + if ci == nil { + return 0, fmt.Errorf("containerInfo nil") + } + + if metaReader == nil { + return 0, fmt.Errorf("metaCache nil") + } -// EstimateContainerResourceUsage used to estimate non-reclaimed pods resources usage. -// If reclaim disabled or metrics missed, resource usage will be regarded as Pod resource requests. -func EstimateContainerResourceUsage(ci *types.ContainerInfo, resourceName v1.ResourceName, - metaReader metacache.MetaReader, reclaimEnable bool) (float64, error) { if ci.QoSLevel != apiconsts.PodAnnotationQoSLevelSharedCores && ci.QoSLevel != apiconsts.PodAnnotationQoSLevelDedicatedCores { return 0, nil } + var ( estimation float64 = 0 reference string ) + checkRequest := !reclaimEnable if !checkRequest { - metricsToGather, ok := resourceMetricsToGather[resourceName] - if !ok || len(metricsToGather) == 0 { - return 0, fmt.Errorf("failed to find metrics to gather for %v", resourceName) - } - if metaReader == nil { - return 0, fmt.Errorf("metaCache nil") - } - - for _, metricName := range metricsToGather { + for _, metricName := range cpuMetricsToGather { metricValue, err := metaReader.GetContainerMetric(ci.PodUID, ci.ContainerName, metricName) - klog.Infof("[qosaware-canonical] pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricName, metricValue) + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricName, metricValue) if err != nil || metricValue <= 0 { checkRequest = true continue @@ -90,16 +97,8 @@ func EstimateContainerResourceUsage(ci *types.ContainerInfo, resourceName v1.Res } if checkRequest { - request := 0.0 - switch resourceName { - case v1.ResourceCPU: - request = ci.CPURequest - case v1.ResourceMemory: - request = ci.MemoryRequest - default: - return 0, fmt.Errorf("invalid resourceName %v", resourceName) - } - klog.Infof("[qosaware-canonical] pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricRequest, request) + request := ci.CPURequest + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricRequest, request) if request > estimation { estimation = request reference = metricRequest @@ -107,16 +106,68 @@ func EstimateContainerResourceUsage(ci *types.ContainerInfo, resourceName v1.Res } if estimation <= 0 { - fallback, ok := estimationFallbackValues[resourceName] - if !ok { - return estimation, fmt.Errorf("failed to find estimation fallback value for %v", resourceName) - } - estimation = fallback + estimation = estimationCPUFallbackValue reference = metricFallback - klog.Infof("[qosaware-canonical] pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricFallback, fallback) + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricFallback, estimationCPUFallbackValue) } - klog.Infof("[qosaware-canonical] pod %v container %v estimation %.2f reference %v", ci.PodName, ci.ContainerName, estimation, reference) + general.Infof("pod %v container %v estimation %.2f reference %v", ci.PodName, ci.ContainerName, estimation, reference) + return estimation, nil +} + +// EstimateContainerMemoryUsage used to estimate non-reclaimed pods memory usage. +// If reclaim disabled or metrics missed, memory usage will be regarded as Pod memory requests. +func EstimateContainerMemoryUsage(ci *types.ContainerInfo, metaReader metacache.MetaReader, reclaimEnable bool) (float64, error) { + if ci == nil { + return 0, fmt.Errorf("containerInfo nil") + } + + if metaReader == nil { + return 0, fmt.Errorf("metaCache nil") + } + + var ( + estimation float64 = 0 + reference string + metricsToGather []string + estimationBufferRatio float64 + ) + + switch ci.QoSLevel { + case apiconsts.PodAnnotationQoSLevelSharedCores, apiconsts.PodAnnotationQoSLevelDedicatedCores: + metricsToGather = memoryMetricsToGatherForSharedAndDedicatedQoS + estimationBufferRatio = estimationSharedDedicateQoSContainerBufferRatio + case apiconsts.PodAnnotationQoSLevelSystemCores: + metricsToGather = memoryMetricsToGatherForSystemQoS + estimationBufferRatio = estimationSystemQoSContainerBufferRatio + default: + return 0, nil + } + + if reclaimEnable { + for _, metricName := range metricsToGather { + metricValue, err := metaReader.GetContainerMetric(ci.PodUID, ci.ContainerName, metricName) + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricName, metricValue) + if err != nil || metricValue <= 0 { + continue + } + estimation += metricValue + } + + if estimationBufferRatio > 0 { + estimation = estimation * estimationBufferRatio + } + } else { + estimation = ci.MemoryRequest + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricRequest, estimation) + } + + if estimation <= 0 { + estimation = estimationMemoryFallbackValue + reference = metricFallback + general.Infof("pod %v container %v metric %v value %v", ci.PodName, ci.ContainerName, metricFallback, estimationMemoryFallbackValue) + } + general.Infof("pod %v container %v estimation %.2f reference %v", ci.PodName, ci.ContainerName, estimation, reference) return estimation, nil } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical.go index 36407405c..d282d3193 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical.go @@ -20,17 +20,18 @@ import ( "fmt" "math" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/klog/v2" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/helper" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom" + "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) type PolicyCanonical struct { @@ -39,31 +40,25 @@ type PolicyCanonical struct { // memoryHeadroom is valid to be used iff updateStatus successes memoryHeadroom float64 updateStatus types.PolicyUpdateStatus + + qosConfig *generic.QoSConfiguration + policyCanonicalConfig *headroom.MemoryPolicyCanonicalConfiguration } -func NewPolicyCanonical(_ *config.Configuration, _ interface{}, metaReader metacache.MetaReader, +func NewPolicyCanonical(conf *config.Configuration, _ interface{}, metaReader metacache.MetaReader, metaServer *metaserver.MetaServer, _ metrics.MetricEmitter) HeadroomPolicy { p := PolicyCanonical{ - PolicyBase: NewPolicyBase(metaReader, metaServer), - updateStatus: types.PolicyUpdateFailed, + PolicyBase: NewPolicyBase(metaReader, metaServer), + updateStatus: types.PolicyUpdateFailed, + qosConfig: conf.QoSConfiguration, + policyCanonicalConfig: conf.MemoryHeadroomPolicyConfiguration.MemoryPolicyCanonicalConfiguration, } return &p } -func (p *PolicyCanonical) estimateContainerMemoryUsage(ci *types.ContainerInfo) (float64, error) { - return helper.EstimateContainerResourceUsage(ci, v1.ResourceMemory, p.metaReader, p.essentials.EnableReclaim) -} - -func (p *PolicyCanonical) Update() (err error) { - defer func() { - if err != nil { - p.updateStatus = types.PolicyUpdateFailed - } else { - p.updateStatus = types.PolicyUpdateSucceeded - } - }() - +// estimateNonReclaimedQoSMemoryRequirement estimates the memory requirement of all containers that are not reclaimed +func (p *PolicyCanonical) estimateNonReclaimedQoSMemoryRequirement() (float64, error) { var ( memoryEstimation float64 = 0 containerCnt float64 = 0 @@ -71,23 +66,57 @@ func (p *PolicyCanonical) Update() (err error) { ) f := func(podUID string, containerName string, ci *types.ContainerInfo) bool { - containerEstimation, err := p.estimateContainerMemoryUsage(ci) + containerEstimation, err := helper.EstimateContainerMemoryUsage(ci, p.metaReader, p.essentials.EnableReclaim) if err != nil { errList = append(errList, err) return true } - klog.Infof("[qosaware-memory-headroom] pod %v container %v estimation %.2e", ci.PodName, containerName, containerEstimation) + general.Infof("pod %v container %v estimation %.2e", ci.PodName, containerName, containerEstimation) memoryEstimation += containerEstimation containerCnt += 1 return true } p.metaReader.RangeContainer(f) - klog.Infof("[qosaware-memory-headroom] memory requirement estimation: %.2e, #container %v", memoryEstimation, containerCnt) + general.Infof("memory requirement estimation: %.2e, #container %v", memoryEstimation, containerCnt) + + return memoryEstimation, errors.NewAggregate(errList) +} + +func (p *PolicyCanonical) Update() (err error) { + defer func() { + if err != nil { + p.updateStatus = types.PolicyUpdateFailed + } else { + p.updateStatus = types.PolicyUpdateSucceeded + } + }() + + var ( + memoryEstimateRequirement float64 + memoryBuffer float64 + ) + + maxAllocatableMemory := float64(p.essentials.Total - p.essentials.ReservedForAllocate) + memoryEstimateRequirement, err = p.estimateNonReclaimedQoSMemoryRequirement() + if err != nil { + return err + } + memoryHeadroomWithoutBuffer := math.Max(maxAllocatableMemory-memoryEstimateRequirement, 0) + + if p.policyCanonicalConfig.Enabled { + memoryBuffer, err = p.calculateMemoryBuffer(memoryEstimateRequirement) + if err != nil { + return err + } + } - p.memoryHeadroom = math.Max(float64(p.essentials.Total-p.essentials.ReservedForAllocate)-memoryEstimation, 0) + p.memoryHeadroom = math.Max(memoryHeadroomWithoutBuffer+memoryBuffer, 0) + p.memoryHeadroom = math.Min(p.memoryHeadroom, maxAllocatableMemory) + general.Infof("without buffer memory headroom: %.2e, final memory headroom: %.2e, memory buffer: %.2e, max memory allocatable: %.2e", + memoryHeadroomWithoutBuffer, p.memoryHeadroom, memoryBuffer, maxAllocatableMemory) - return errors.NewAggregate(errList) + return nil } func (p *PolicyCanonical) GetHeadroom() (resource.Quantity, error) { @@ -95,5 +124,5 @@ func (p *PolicyCanonical) GetHeadroom() (resource.Quantity, error) { return resource.Quantity{}, fmt.Errorf("last update failed") } - return *resource.NewQuantity(int64(p.memoryHeadroom), resource.DecimalSI), nil + return *resource.NewQuantity(int64(p.memoryHeadroom), resource.BinarySI), nil } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_buffer.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_buffer.go new file mode 100644 index 000000000..886238c78 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_buffer.go @@ -0,0 +1,193 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroompolicy + +import ( + "context" + "fmt" + "math" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/metric" +) + +// getSystemMemoryInfo returns the total memory and available memory of the system in bytes +func (p *PolicyCanonical) calculateMemoryBuffer(estimateNonReclaimedRequirement float64) (float64, error) { + var ( + systemMetrics *systemMemoryMetrics + systemQoSMetrics *memoryMetrics + sharedAndDedicateQoSMetrics *memoryMetrics + nodeCPUMemoryRatio float64 + err error + ) + + systemMetrics, err = p.getSystemMemoryInfo() + if err != nil { + return 0, err + } + + sharedAndDedicateQoSMetrics, err = p.getMemoryMetrics(p.filterSharedAndDedicateQoSPods) + if err != nil { + return 0, err + } + + systemQoSMetrics, err = p.getMemoryMetrics(p.filterSystemQoSPods) + if err != nil { + return 0, err + } + + nodeCPUMemoryRatio, err = p.getNodeCPUMemoryRatio() + if err != nil { + return 0, err + } + + // calculate system buffer with double scale_factor to make kswapd less happened + systemFactor := systemMetrics.memoryTotal * 2 * systemMetrics.scaleFactor / 10000 + systemBuffer := systemMetrics.memoryFree*p.policyCanonicalConfig.FreeBasedRatio - systemFactor + + // calculate shared and dedicate qos total used + sharedAndDedicateQoSTotalUsed := sharedAndDedicateQoSMetrics.rss + sharedAndDedicateQoSMetrics.shmem + + sharedAndDedicateQoSMetrics.cache + + // calculate system qos extra usage with only shmem and cache + systemQoSExtraUsage := systemQoSMetrics.cache + systemQoSMetrics.shmem + + // calculate non-reclaimed qos extra usage + nonReclaimedQoSExtraUsage := estimateNonReclaimedRequirement - sharedAndDedicateQoSTotalUsed + systemQoSExtraUsage + + // calculate buffer using system buffer to subtract non-reclaimed qos extra usage + buffer := math.Max(systemBuffer-nonReclaimedQoSExtraUsage, 0) + + // calculate cache oversold buffer if cache oversold rate is set and cpu/memory ratio is in range and memory utilization is not 0 + if p.policyCanonicalConfig.CacheBasedRatio > 0 && nodeCPUMemoryRatio > p.policyCanonicalConfig.CPUMemRatioLowerBound && + nodeCPUMemoryRatio < p.policyCanonicalConfig.CPUMemRatioUpperBound && systemMetrics.memoryUtilization > 0 { + cacheBuffer := (systemMetrics.memoryTotal*(1-systemMetrics.memoryUtilization) - systemMetrics.memoryFree) * + p.policyCanonicalConfig.CacheBasedRatio + buffer = math.Max(buffer, cacheBuffer) + general.Infof("cache oversold rate: %.2f, cache buffer: %.2e, buffer: %.2e", + p.policyCanonicalConfig.CacheBasedRatio, cacheBuffer, buffer) + } + + // add static oversold buffer + result := buffer + p.policyCanonicalConfig.StaticBasedCapacity + general.Infof("system buffer: %.2e, non-reclaimed QoS extra usage: %.2e, static oversold: %.2e, result: %.2e", + systemBuffer, nonReclaimedQoSExtraUsage, p.policyCanonicalConfig.StaticBasedCapacity, result) + return result, nil +} + +type systemMemoryMetrics struct { + memoryTotal float64 + memoryFree float64 + scaleFactor float64 + memoryUtilization float64 +} + +// getSystemMemoryInfo get system memory info from meta server +func (p *PolicyCanonical) getSystemMemoryInfo() (*systemMemoryMetrics, error) { + var ( + info systemMemoryMetrics + err error + ) + + info.memoryTotal, err = p.metaServer.GetNodeMetric(consts.MetricMemTotalSystem) + if err != nil { + return nil, err + } + + info.memoryFree, err = p.metaServer.GetNodeMetric(consts.MetricMemFreeSystem) + if err != nil { + return nil, err + } + + info.scaleFactor, err = p.metaServer.GetNodeMetric(consts.MetricMemScaleFactorSystem) + if err != nil { + return nil, err + } + + used, err := p.metaServer.GetNodeMetric(consts.MetricMemUsedSystem) + if err != nil { + return nil, err + } + info.memoryUtilization = used / info.memoryTotal + + return &info, nil +} + +// getNodeCPUMemoryRatio get node memory/cpu ratio from meta server +func (p *PolicyCanonical) getNodeCPUMemoryRatio() (float64, error) { + cpuCapacity := p.metaServer.MachineInfo.NumCores + memoryCapacity := p.metaServer.MemoryCapacity + if memoryCapacity == 0 { + return 0, fmt.Errorf("memory capacity is 0") + } + + return float64(cpuCapacity) / (float64(memoryCapacity) / 1024 / 1024 / 1024), nil +} + +type memoryMetrics struct { + cache float64 + shmem float64 + rss float64 +} + +// getMemoryMetrics get memory metrics from meta server with filter +func (p *PolicyCanonical) getMemoryMetrics(filter func(pod *v1.Pod) bool) (*memoryMetrics, error) { + regionPods, err := p.metaServer.GetPodList(context.Background(), filter) + if err != nil { + return nil, err + } + + cache := p.metaServer.AggregatePodMetric(regionPods, consts.MetricMemCacheContainer, metric.AggregatorSum, metric.DefaultContainerMetricFilter) + shmem := p.metaServer.AggregatePodMetric(regionPods, consts.MetricMemShmemContainer, metric.AggregatorSum, metric.DefaultContainerMetricFilter) + rss := p.metaServer.AggregatePodMetric(regionPods, consts.MetricMemRssContainer, metric.AggregatorSum, metric.DefaultContainerMetricFilter) + return &memoryMetrics{ + cache: cache, + shmem: shmem, + rss: rss, + }, nil +} + +// filterSystemQoSPods filter system qos pods +func (p *PolicyCanonical) filterSystemQoSPods(pod *v1.Pod) bool { + if ok, err := p.qosConfig.CheckSystemQoSForPod(pod); err != nil { + klog.Errorf("filter pod %v err: %v", pod.Name, err) + return false + } else { + return ok + } +} + +// filterSharedAndDedicateQoSPods filter shared and dedicate qos pods +func (p *PolicyCanonical) filterSharedAndDedicateQoSPods(pod *v1.Pod) bool { + isSharedQoS, err := p.qosConfig.CheckSharedQoSForPod(pod) + if err != nil { + klog.Errorf("filter pod %v err: %v", pod.Name, err) + return false + } + + isDedicateQoS, err := p.qosConfig.CheckDedicatedQoSForPod(pod) + if err != nil { + klog.Errorf("filter pod %v err: %v", pod.Name, err) + return false + } + + return isSharedQoS || isDedicateQoS +} diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_test.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_test.go new file mode 100644 index 000000000..e4541fe3e --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/headroompolicy/policy_canonical_test.go @@ -0,0 +1,459 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroompolicy + +import ( + "io/ioutil" + "os" + "testing" + + info "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + qrmstate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom" + pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" + utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric" +) + +var ( + qosLevel2PoolName = map[string]string{ + consts.PodAnnotationQoSLevelSharedCores: qrmstate.PoolNameShare, + consts.PodAnnotationQoSLevelReclaimedCores: qrmstate.PoolNameReclaim, + consts.PodAnnotationQoSLevelSystemCores: qrmstate.PoolNameReserve, + consts.PodAnnotationQoSLevelDedicatedCores: qrmstate.PoolNameDedicated, + } +) + +func generateTestConfiguration(t *testing.T, checkpointDir, stateFileDir string) *config.Configuration { + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + conf.GenericSysAdvisorConfiguration.StateFileDirectory = stateFileDir + conf.MetaServerConfiguration.CheckpointManagerDir = checkpointDir + + return conf +} + +func generateTestMetaServer(t *testing.T, podList []*v1.Pod, + metricsFetcher metric.MetricsFetcher) *metaserver.MetaServer { + // numa node0 cpu(s): 0-23,48-71 + // numa node1 cpu(s): 24-47,72-95 + cpuTopology, err := machine.GenerateDummyCPUTopology(96, 2, 2) + require.NoError(t, err) + + metaServer := &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + KatalystMachineInfo: &machine.KatalystMachineInfo{ + MachineInfo: &info.MachineInfo{ + NumCores: 96, + MemoryCapacity: 500 << 30, + }, + CPUTopology: cpuTopology, + }, + PodFetcher: &pod.PodFetcherStub{PodList: podList}, + MetricsFetcher: metricsFetcher, + }, + } + return metaServer +} + +func makeContainerInfo(podUID, namespace, podName, containerName, qoSLevel string, annotations map[string]string, + topologyAwareAssignments types.TopologyAwareAssignment, memoryRequest float64) *types.ContainerInfo { + return &types.ContainerInfo{ + PodUID: podUID, + PodNamespace: namespace, + PodName: podName, + ContainerName: containerName, + ContainerIndex: 0, + Labels: nil, + Annotations: annotations, + QoSLevel: qoSLevel, + CPURequest: 0, + MemoryRequest: memoryRequest, + RampUp: false, + OwnerPoolName: qosLevel2PoolName[qoSLevel], + TopologyAwareAssignments: topologyAwareAssignments, + OriginalTopologyAwareAssignments: topologyAwareAssignments, + } +} + +func TestPolicyCanonical_calculateMemoryBuffer(t *testing.T) { + type fields struct { + podList []*v1.Pod + containers []*types.ContainerInfo + policyCanonicalConfiguration *headroom.MemoryPolicyCanonicalConfiguration + essentials types.ResourceEssentials + setFakeMetric func(store *utilmetric.MetricStore) + } + type args struct { + estimateNonReclaimedRequirement float64 + } + tests := []struct { + name string + fields fields + args args + want resource.Quantity + wantErr bool + }{ + { + name: "normal disable buffer", + fields: fields{ + podList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + UID: "pod2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSystemCores, + }, + }, + }, + }, + containers: []*types.ContainerInfo{ + makeContainerInfo("pod1", "default", + "pod1", "container1", + consts.PodAnnotationQoSLevelSharedCores, nil, + nil, 1), + makeContainerInfo("pod2", "default", + "pod2", "container2", + consts.PodAnnotationQoSLevelSystemCores, nil, + nil, 1), + }, + policyCanonicalConfiguration: &headroom.MemoryPolicyCanonicalConfiguration{ + MemoryUtilBasedConfiguration: &headroom.MemoryUtilBasedConfiguration{ + Enabled: false, + }, + }, + essentials: types.ResourceEssentials{ + EnableReclaim: true, + Total: 100 << 30, + ReservedForAllocate: 4 << 30, + }, + setFakeMetric: func(store *utilmetric.MetricStore) { + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemCacheContainer, 10<<30) + + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod2", "container1", pkgconsts.MetricMemCacheContainer, 10<<30) + }, + }, + want: *resource.NewQuantity((96<<30)-(20<<30)*1.1-(10<<30), resource.BinarySI), + }, + { + name: "normal enable buffer", + fields: fields{ + podList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + UID: "pod2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSystemCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container2", + }, + }, + }, + }, + }, + containers: []*types.ContainerInfo{ + makeContainerInfo("pod1", "default", + "pod1", "container1", + consts.PodAnnotationQoSLevelSharedCores, nil, + nil, 1), + makeContainerInfo("pod2", "default", + "pod2", "container2", + consts.PodAnnotationQoSLevelSystemCores, nil, + nil, 1), + }, + policyCanonicalConfiguration: &headroom.MemoryPolicyCanonicalConfiguration{ + MemoryUtilBasedConfiguration: &headroom.MemoryUtilBasedConfiguration{ + Enabled: true, + FreeBasedRatio: 0.6, + StaticBasedCapacity: 20 << 30, + }, + }, + essentials: types.ResourceEssentials{ + EnableReclaim: true, + Total: 100 << 30, + ReservedForAllocate: 4 << 30, + }, + setFakeMetric: func(store *utilmetric.MetricStore) { + store.SetNodeMetric(pkgconsts.MetricMemTotalSystem, 100<<30) + store.SetNodeMetric(pkgconsts.MetricMemFreeSystem, 60<<30) + store.SetNodeMetric(pkgconsts.MetricMemScaleFactorSystem, 500) + store.SetNodeMetric(pkgconsts.MetricMemUsedSystem, 40<<30) + + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemCacheContainer, 10<<30) + + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemCacheContainer, 10<<30) + }, + }, + want: *resource.NewQuantity((96<<30)-((20<<30)*1.1+(10<<30))+((60<<30)*0.6-(10<<30)-((20<<30)*1.1+(10<<30)-(20<<30)+(10<<30))+(20<<30)), resource.BinarySI), + }, + { + name: "enable buffer but memory free is not enough", + fields: fields{ + podList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + UID: "pod2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSystemCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container2", + }, + }, + }, + }, + }, + containers: []*types.ContainerInfo{ + makeContainerInfo("pod1", "default", + "pod1", "container1", + consts.PodAnnotationQoSLevelSharedCores, nil, + nil, 1), + makeContainerInfo("pod2", "default", + "pod2", "container2", + consts.PodAnnotationQoSLevelSystemCores, nil, + nil, 1), + }, + policyCanonicalConfiguration: &headroom.MemoryPolicyCanonicalConfiguration{ + MemoryUtilBasedConfiguration: &headroom.MemoryUtilBasedConfiguration{ + Enabled: true, + FreeBasedRatio: 0.6, + StaticBasedCapacity: 20 << 30, + }, + }, + essentials: types.ResourceEssentials{ + EnableReclaim: true, + Total: 100 << 30, + ReservedForAllocate: 4 << 30, + }, + setFakeMetric: func(store *utilmetric.MetricStore) { + store.SetNodeMetric(pkgconsts.MetricMemTotalSystem, 100<<30) + store.SetNodeMetric(pkgconsts.MetricMemFreeSystem, 30<<30) + store.SetNodeMetric(pkgconsts.MetricMemScaleFactorSystem, 500) + store.SetNodeMetric(pkgconsts.MetricMemUsedSystem, 40<<30) + + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemCacheContainer, 10<<30) + + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemCacheContainer, 10<<30) + }, + }, + want: *resource.NewQuantity((96<<30)-((20<<30)*1.1+(10<<30))+(20<<30), resource.BinarySI), + }, + { + name: "enable buffer and cache oversold", + fields: fields{ + podList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + UID: "pod2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSystemCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container2", + }, + }, + }, + }, + }, + containers: []*types.ContainerInfo{ + makeContainerInfo("pod1", "default", + "pod1", "container1", + consts.PodAnnotationQoSLevelSharedCores, nil, + nil, 1), + makeContainerInfo("pod2", "default", + "pod2", "container2", + consts.PodAnnotationQoSLevelSystemCores, nil, + nil, 1), + }, + policyCanonicalConfiguration: &headroom.MemoryPolicyCanonicalConfiguration{ + MemoryUtilBasedConfiguration: &headroom.MemoryUtilBasedConfiguration{ + Enabled: true, + CacheBasedRatio: 0.6, + CPUMemRatioLowerBound: 1. / 6., + CPUMemRatioUpperBound: 1. / 3.5, + FreeBasedRatio: 0.6, + StaticBasedCapacity: 20 << 30, + }, + }, + essentials: types.ResourceEssentials{ + EnableReclaim: true, + Total: 100 << 30, + ReservedForAllocate: 4 << 30, + }, + setFakeMetric: func(store *utilmetric.MetricStore) { + store.SetNodeMetric(pkgconsts.MetricMemTotalSystem, 100<<30) + store.SetNodeMetric(pkgconsts.MetricMemFreeSystem, 20<<30) + store.SetNodeMetric(pkgconsts.MetricMemScaleFactorSystem, 500) + store.SetNodeMetric(pkgconsts.MetricMemUsedSystem, 60<<30) + + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemRssContainer, 15<<30) + store.SetContainerMetric("pod1", "container1", pkgconsts.MetricMemCacheContainer, 15<<30) + + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemRssContainer, 10<<30) + store.SetContainerMetric("pod2", "container2", pkgconsts.MetricMemCacheContainer, 10<<30) + }, + }, + want: *resource.NewQuantity((96<<30)-((30<<30)*1.1+(10<<30))+((40<<30)-(20<<30))*0.6+(20<<30), resource.BinarySI), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ckDir, err := ioutil.TempDir("", "checkpoint") + require.NoError(t, err) + defer os.RemoveAll(ckDir) + + sfDir, err := ioutil.TempDir("", "statefile") + require.NoError(t, err) + defer os.RemoveAll(sfDir) + + conf := generateTestConfiguration(t, ckDir, sfDir) + conf.MemoryPolicyCanonicalConfiguration = tt.fields.policyCanonicalConfiguration + + metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}) + metaCache, err := metacache.NewMetaCacheImp(conf, metricsFetcher) + require.NoError(t, err) + + for _, c := range tt.fields.containers { + err := metaCache.SetContainerInfo(c.PodUID, c.ContainerName, c) + assert.NoError(t, err) + } + + metaServer := generateTestMetaServer(t, tt.fields.podList, metricsFetcher) + + p := NewPolicyCanonical(conf, nil, metaCache, metaServer, metrics.DummyMetrics{}) + + store := utilmetric.GetMetricStoreInstance() + tt.fields.setFakeMetric(store) + + p.SetEssentials(tt.fields.essentials) + + err = p.Update() + require.NoError(t, err) + got, err := p.GetHeadroom() + if (err != nil) != tt.wantErr { + t.Errorf("calculateMemoryBuffer() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("calculateMemoryBuffer() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_base.go b/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_base.go new file mode 100644 index 000000000..dbbebf096 --- /dev/null +++ b/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_base.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroom + +import "github.com/kubewharf/katalyst-core/pkg/config/dynamic" + +type MemoryHeadroomPolicyConfiguration struct { + MemoryPolicyCanonicalConfiguration *MemoryPolicyCanonicalConfiguration +} + +func NewMemoryHeadroomPolicyConfiguration() *MemoryHeadroomPolicyConfiguration { + return &MemoryHeadroomPolicyConfiguration{ + MemoryPolicyCanonicalConfiguration: NewMemoryPolicyCanonicalConfiguration(), + } +} + +func (c *MemoryHeadroomPolicyConfiguration) ApplyConfiguration(defaultConf *MemoryHeadroomPolicyConfiguration, conf *dynamic.DynamicConfigCRD) { + c.MemoryPolicyCanonicalConfiguration.ApplyConfiguration(defaultConf.MemoryPolicyCanonicalConfiguration, conf) +} diff --git a/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go b/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go new file mode 100644 index 000000000..fe7ad426f --- /dev/null +++ b/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom/policy_canonical.go @@ -0,0 +1,41 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package headroom + +import "github.com/kubewharf/katalyst-core/pkg/config/dynamic" + +type MemoryPolicyCanonicalConfiguration struct { + *MemoryUtilBasedConfiguration +} + +type MemoryUtilBasedConfiguration struct { + Enabled bool + FreeBasedRatio float64 + StaticBasedCapacity float64 + CacheBasedRatio float64 + CPUMemRatioLowerBound float64 + CPUMemRatioUpperBound float64 +} + +func NewMemoryPolicyCanonicalConfiguration() *MemoryPolicyCanonicalConfiguration { + return &MemoryPolicyCanonicalConfiguration{ + MemoryUtilBasedConfiguration: &MemoryUtilBasedConfiguration{}, + } +} + +func (c *MemoryPolicyCanonicalConfiguration) ApplyConfiguration(*MemoryPolicyCanonicalConfiguration, *dynamic.DynamicConfigCRD) { +} diff --git a/pkg/config/agent/sysadvisor/qosaware/resource/memory/memory_advisor.go b/pkg/config/agent/sysadvisor/qosaware/resource/memory/memory_advisor.go index 1b436e0e3..d93c342f5 100644 --- a/pkg/config/agent/sysadvisor/qosaware/resource/memory/memory_advisor.go +++ b/pkg/config/agent/sysadvisor/qosaware/resource/memory/memory_advisor.go @@ -18,21 +18,26 @@ package memory import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/qosaware/resource/memory/headroom" "github.com/kubewharf/katalyst-core/pkg/config/dynamic" ) // MemoryAdvisorConfiguration stores configurations of memory advisors in qos aware plugin type MemoryAdvisorConfiguration struct { MemoryHeadroomPolicies []types.MemoryHeadroomPolicyName + + *headroom.MemoryHeadroomPolicyConfiguration } // NewMemoryAdvisorConfiguration creates new memory advisor configurations func NewMemoryAdvisorConfiguration() *MemoryAdvisorConfiguration { return &MemoryAdvisorConfiguration{ - MemoryHeadroomPolicies: make([]types.MemoryHeadroomPolicyName, 0), + MemoryHeadroomPolicies: make([]types.MemoryHeadroomPolicyName, 0), + MemoryHeadroomPolicyConfiguration: headroom.NewMemoryHeadroomPolicyConfiguration(), } } // ApplyConfiguration is used to set configuration based on conf. -func (c *MemoryAdvisorConfiguration) ApplyConfiguration(*MemoryAdvisorConfiguration, *dynamic.DynamicConfigCRD) { +func (c *MemoryAdvisorConfiguration) ApplyConfiguration(defaultConf *MemoryAdvisorConfiguration, conf *dynamic.DynamicConfigCRD) { + c.MemoryHeadroomPolicyConfiguration.ApplyConfiguration(defaultConf.MemoryHeadroomPolicyConfiguration, conf) }