diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 237faba04..127b2050b 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -88,8 +88,9 @@ func NewAgent(ctx context.Context, managers = appendManagerIfNotNil(managers, stateCollector) analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh) managers = appendManagerIfNotNil(managers, analyzerManager) - avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint) + avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.GetStateFunc()) managers = appendManagerIfNotNil(managers, avoidanceManager) + if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { tspName, err := agent.CreateNodeResourceTsp() if err != nil { diff --git a/pkg/ensurance/analyzer/analyzer.go b/pkg/ensurance/analyzer/analyzer.go index 91198adef..5530b8cea 100644 --- a/pkg/ensurance/analyzer/analyzer.go +++ b/pkg/ensurance/analyzer/analyzer.go @@ -1,6 +1,7 @@ package analyzer import ( + "container/heap" "fmt" "strings" "time" @@ -22,9 +23,9 @@ import ( ensurancelisters "github.com/gocrane/api/pkg/generated/listers/ensurance/v1alpha1" "github.com/gocrane/crane/pkg/common" "github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator" - execsort "github.com/gocrane/crane/pkg/ensurance/analyzer/sort" ecache "github.com/gocrane/crane/pkg/ensurance/cache" "github.com/gocrane/crane/pkg/ensurance/executor" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -188,7 +189,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) { return } - //step 4 :notice the enforcer manager + //step 4 : notice the enforcer manager s.notify(avoidanceAction) return @@ -296,6 +297,9 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida //step3 get and deduplicate throttlePods, throttleUpPods if action.Spec.Throttle != nil { throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap) + + // combine the throttle waterline + combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule) // combine the replicated pod combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods) } @@ -303,19 +307,14 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida //step4 get and deduplicate evictPods if action.Spec.Eviction != nil { evictPods := s.getEvictPods(ac.Triggered, action, stateMap) + + // combine the evict waterline + combineEvictWaterLine(&ae.EvictExecutor, ac) // combine the replicated pod combineEvictDuplicate(&ae.EvictExecutor, evictPods) } } - // sort the throttle executor by pod qos priority - execsort.CpuMetricsSorter(ae.ThrottleExecutor.ThrottleDownPods) - execsort.CpuMetricsSorter(ae.ThrottleExecutor.ThrottleUpPods) - ae.ThrottleExecutor.ThrottleUpPods = executor.Reverse(ae.ThrottleExecutor.ThrottleUpPods) - - // sort the evict executor by pod qos priority - execsort.CpuMetricsSorter(ae.EvictExecutor.EvictPods) - return ae } @@ -367,9 +366,7 @@ func (s *AnormalyAnalyzer) getTimeSeriesFromMap(state []common.TimeSeries, selec } func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) { - //step1: check need to notice enforcer manager - - //step2: notice by channel + //step1: notice by channel s.actionCh <- as return } @@ -389,7 +386,13 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool { } func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext, - action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) (throttlePods []executor.PodContext, throttleUpPods []executor.PodContext) { + action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) { + + throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{} + + if !(ac.Triggered || (enableSchedule && ac.Restored)) { + return throttlePods, throttleUpPods + } allPods, err := s.podLister.List(labels.Everything()) if err != nil { @@ -399,18 +402,18 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action for _, pod := range allPods { if ac.Triggered { - throttlePods = append(throttlePods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Throttle)) + throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown)) } if enableSchedule && ac.Restored { - throttleUpPods = append(throttleUpPods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Throttle)) + throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp)) } } return throttlePods, throttleUpPods } -func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []executor.PodContext { - evictPods := []executor.PodContext{} +func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []podinfo.PodContext { + evictPods := []podinfo.PodContext{} if triggered { allPods, err := s.podLister.List(labels.Everything()) @@ -420,8 +423,7 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo } for _, pod := range allPods { - evictPods = append(evictPods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Evict)) - + evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict)) } } return evictPods @@ -432,7 +434,7 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon // If any rules are triggered, the avoidance is true,otherwise the avoidance is false. // If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false. - // If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false. + // If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false. var enableScheduling, avoidance, restore bool defer func() { @@ -469,11 +471,11 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon if avoidance { s.lastTriggeredTime = now - ae.ScheduleExecutor.DisableClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} + ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} } if enableScheduling { - ae.ScheduleExecutor.RestoreClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} + ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} } return enableScheduling @@ -521,3 +523,53 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo } } } + +func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) { + if !(ac.Triggered || (enableSchedule && ac.Restored)) { + return + } + if ac.Triggered { + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.ThrottleDownWaterLine == nil { + e.ThrottleDownWaterLine = make(map[string]*executor.WaterLine) + } + // Use a heap here, so we don't need to use - as value, just use + if e.ThrottleDownWaterLine[ensurance.MetricRule.Name] == nil { + e.ThrottleDownWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{} + } + heap.Push(e.ThrottleDownWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value) + } + } + } + + if enableSchedule && ac.Restored { + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.ThrottleUpWaterLine == nil { + e.ThrottleUpWaterLine = make(map[string]*executor.WaterLine) + } + if e.ThrottleUpWaterLine[ensurance.MetricRule.Name] == nil { + e.ThrottleUpWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{} + } + heap.Push(e.ThrottleUpWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value) + } + } + } +} + +func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) { + if ac.Triggered { + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.EvictWaterLine == nil { + e.EvictWaterLine = make(map[string]*executor.WaterLine) + } + if e.EvictWaterLine[ensurance.MetricRule.Name] == nil { + e.EvictWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{} + } + heap.Push(e.EvictWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value) + } + } + } +} diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index b7967e65c..528281504 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -36,6 +36,8 @@ type StateCollector struct { AnalyzerChann chan map[string][]common.TimeSeries NodeResourceChann chan map[string][]common.TimeSeries PodResourceChann chan map[string][]common.TimeSeries + state map[string][]common.TimeSeries + rw sync.RWMutex } func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsurancePolicyLister, podLister corelisters.PodLister, @@ -95,7 +97,7 @@ func (s *StateCollector) Run(stop <-chan struct{}) { start := time.Now() metrics.UpdateLastTime(string(known.ModuleStateCollector), metrics.StepMain, start) s.healthCheck.UpdateLastActivity(start) - s.Collect() + s.Collect(false) metrics.UpdateDurationFromStart(string(known.ModuleStateCollector), metrics.StepMain, start) case <-stop: klog.Infof("StateCollector exit") @@ -107,12 +109,11 @@ func (s *StateCollector) Run(stop <-chan struct{}) { return } -func (s *StateCollector) Collect() { +func (s *StateCollector) Collect(waterLine bool) { wg := sync.WaitGroup{} start := time.Now() var data = make(map[string][]common.TimeSeries) - var mux sync.Mutex s.collectors.Range(func(key, value interface{}) bool { c := value.(Collector) @@ -125,11 +126,11 @@ func (s *StateCollector) Collect() { defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleStateCollector), string(c.GetType()), metrics.StepCollect, start) if cdata, err := c.Collect(); err == nil { - mux.Lock() + s.rw.Lock() for key, series := range cdata { data[key] = series } - mux.Unlock() + s.rw.Unlock() } }(c, data) @@ -138,7 +139,12 @@ func (s *StateCollector) Collect() { wg.Wait() - s.AnalyzerChann <- data + // If Collect is not called by waterline related logic but StateCollector.Run, AnalyzerChann should not get update, which will trigger recursive analyzes and executes + if !waterLine { + s.AnalyzerChann <- data + } + + s.state = data if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { s.NodeResourceChann <- data @@ -245,3 +251,12 @@ func CheckMetricNameExist(name string) bool { return false } + +func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries { + return func() map[string][]common.TimeSeries { + s.Collect(true) + s.rw.RLock() + defer s.rw.RUnlock() + return s.state + } +} diff --git a/pkg/ensurance/collector/types/types.go b/pkg/ensurance/collector/types/types.go index 540a5507c..f56b72dd1 100644 --- a/pkg/ensurance/collector/types/types.go +++ b/pkg/ensurance/collector/types/types.go @@ -46,6 +46,7 @@ const ( MetricNetworkDropIn MetricName = "network_drop_in" MetricNetworkDropOut MetricName = "network_drop_out" + // Attention: this value is cpuUsageIncrease/timeIncrease, not cpuUsage MetricNameContainerCpuTotalUsage MetricName = "container_cpu_total_usage" MetricNameContainerCpuLimit MetricName = "container_cpu_limit" MetricNameContainerCpuQuota MetricName = "container_cpu_quota" diff --git a/pkg/ensurance/executor/evict.go b/pkg/ensurance/executor/evict.go index 012602102..3875cc564 100644 --- a/pkg/ensurance/executor/evict.go +++ b/pkg/ensurance/executor/evict.go @@ -9,6 +9,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -16,9 +18,11 @@ import ( type EvictExecutor struct { EvictPods EvictPods + // All metrics(not only can be quantified metrics) metioned in triggerd NodeQOSEnsurancePolicy and their corresponding waterlines + EvictWaterLine WaterLines } -type EvictPods []PodContext +type EvictPods []podinfo.PodContext func (e EvictPods) Find(key types.NamespacedName) int { for i, v := range e { @@ -45,41 +49,60 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { metrics.UpdateExecutorStatus(metrics.SubComponentEvict, metrics.StepAvoid, 1.0) metrics.ExecutorStatusCounterInc(metrics.SubComponentEvict, metrics.StepAvoid) - var bSucceed = true - var errPodKeys []string - - wg := sync.WaitGroup{} - - for i := range e.EvictPods { - wg.Add(1) - - go func(evictPod PodContext) { - defer wg.Done() - - pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String()) - return + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + var totalReleased ReleaseResource + + /* The step to evict: + 1. If EvictWaterLine has metrics that not in WaterLineMetricsCanBeQualified, evict all evictPods and calculate the release resource, then return + 2. If a metric which both in WaterLineMetricsCanBeQualified and EvictWaterLine doesn't has usage value in statemap, which means we can't get the + accurate value from collector, then evict all evictPods and return + 3. If there is gap to the metrics in WaterLineMetricsCanBeQualified, evict finely according to the gap + 3.1 First sort pods by memory metrics, because it is incompressible more urgent; Then evict sorted pods one by one util there is + no gap to waterline on memory usage + 3.2 Then sort pods by cpu metrics, Then evict sorted pods one by one util there is no gap to waterline on cpu usage + */ + + // If there is a metric that can't be qualified, then we evict all selected evictedPod and return + if e.EvictWaterLine.HasMetricNotInWaterLineMetrics() { + errPodKeys = e.evictPods(ctx, &totalReleased) + } else { + _, _, ctx.EvictGapToWaterLines = buildGapToWaterLine(ctx.getStateFunc(), ThrottleExecutor{}, *e) + if ctx.EvictGapToWaterLines.HasUsageMissedMetric() { + // If there is metric in EvictGapToWaterLines(get from trigger NodeQOSEnsurancePolicy) that can't get current usage, we have to evcit all selected evictedPod + errPodKeys = e.evictPods(ctx, &totalReleased) + } else { + // The metrics in EvictGapToWaterLines are all in WaterLineMetricsCanBeQualified and they has current usage,we can do evict precisely + wg := sync.WaitGroup{} + var released ReleaseResource + + // First evict pods according to incompressible resource: memory + execsort.MemMetricsSorter(e.EvictPods) + for !ctx.EvictGapToWaterLines.TargetGapsRemoved(MemUsage) { + if podinfo.HasNoExecutedPod(e.EvictPods) { + index := podinfo.GetFirstNoExecutedPod(e.EvictPods) + errKeys, released = e.evictOnePod(&wg, ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.EvictGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.EvictGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } } - - err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String()) - klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err) - return + // Then evict pods according to compressible resource: cpu + execsort.CpuMetricsSorter(e.EvictPods) + for !ctx.EvictGapToWaterLines.TargetGapsRemoved(CpuUsage) { + if podinfo.HasNoExecutedPod(e.EvictPods) { + index := podinfo.GetFirstNoExecutedPod(e.EvictPods) + errKeys, released = e.evictOnePod(&wg, ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.EvictGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.EvictGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } } - - metrics.ExecutorEvictCountsInc() - - klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod)) - }(e.EvictPods[i]) + wg.Wait() + } } - wg.Wait() - - if !bSucceed { + if len(errPodKeys) != 0 { return fmt.Errorf("some pod evict failed,err: %s", strings.Join(errPodKeys, ";")) } @@ -89,3 +112,43 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { func (e *EvictExecutor) Restore(ctx *ExecuteContext) error { return nil } + +func (e *EvictExecutor) evictPods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource) (errPodKeys []string) { + wg := sync.WaitGroup{} + for i := range e.EvictPods { + errKeys, _ := e.evictOnePod(&wg, ctx, i, totalReleasedResource) + errPodKeys = append(errPodKeys, errKeys...) + } + wg.Wait() + return +} + +func (e *EvictExecutor) evictOnePod(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { + wg.Add(1) + + go func(evictPod podinfo.PodContext) { + defer wg.Done() + + pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String()) + return + } + + err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds) + if err != nil { + errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String()) + klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err) + return + } + + metrics.ExecutorEvictCountsInc() + + klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod)) + + // Calculate release resources + released = ConstructRelease(evictPod, 0.0, 0.0) + totalReleasedResource.Add(released) + }(e.EvictPods[index]) + return +} diff --git a/pkg/ensurance/executor/executor.go b/pkg/ensurance/executor/executor.go index 0675a2e8c..c6a3e2869 100644 --- a/pkg/ensurance/executor/executor.go +++ b/pkg/ensurance/executor/executor.go @@ -11,6 +11,7 @@ import ( pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" + "github.com/gocrane/crane/pkg/common" cgrpc "github.com/gocrane/crane/pkg/ensurance/grpc" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/known" @@ -29,11 +30,13 @@ type ActionExecutor struct { runtimeClient pb.RuntimeServiceClient runtimeConn *grpc.ClientConn + + getStateFunc func() map[string][]common.TimeSeries } // NewActionExecutor create enforcer manager func NewActionExecutor(client clientset.Interface, nodeName string, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, - noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string) *ActionExecutor { + noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string, getStateFunc func() map[string][]common.TimeSeries) *ActionExecutor { runtimeClient, runtimeConn, err := cruntime.GetRuntimeClient(runtimeEndpoint) if err != nil { @@ -51,6 +54,7 @@ func NewActionExecutor(client clientset.Interface, nodeName string, podInformer nodeSynced: nodeInformer.Informer().HasSynced, runtimeClient: runtimeClient, runtimeConn: runtimeConn, + getStateFunc: getStateFunc, } } @@ -103,6 +107,7 @@ func (a *ActionExecutor) execute(ae AvoidanceExecutor, _ <-chan struct{}) error NodeLister: a.nodeLister, RuntimeClient: a.runtimeClient, RuntimeConn: a.runtimeConn, + getStateFunc: a.getStateFunc, } //step1 do enforcer actions diff --git a/pkg/ensurance/executor/interface.go b/pkg/ensurance/executor/interface.go index 5b39921a2..e9802347d 100644 --- a/pkg/ensurance/executor/interface.go +++ b/pkg/ensurance/executor/interface.go @@ -5,6 +5,8 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + + "github.com/gocrane/crane/pkg/common" ) type Executor interface { @@ -25,4 +27,15 @@ type ExecuteContext struct { NodeLister corelisters.NodeLister RuntimeClient pb.RuntimeServiceClient RuntimeConn *grpc.ClientConn + + // Gap for metrics in WaterLineMetricsCanBeQualified + // Key is the metric name in WaterLineMetricsCanBeQualified, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleDown action) + ThrottoleDownGapToWaterLines GapToWaterLines + // Key is the metric name in WaterLineMetricsCanBeQualified, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleUp action) + ThrottoleUpGapToWaterLines GapToWaterLines + // key is the metric name in WaterLineMetricsCanBeQualified, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use evict action) + // Only has metrics that can be quantified + EvictGapToWaterLines GapToWaterLines + + getStateFunc func() map[string][]common.TimeSeries } diff --git a/pkg/ensurance/executor/pod_info.go b/pkg/ensurance/executor/pod-info/pod_info.go similarity index 70% rename from pkg/ensurance/executor/pod_info.go rename to pkg/ensurance/executor/pod-info/pod_info.go index 2b2f12187..44de76bc9 100644 --- a/pkg/ensurance/executor/pod_info.go +++ b/pkg/ensurance/executor/pod-info/pod_info.go @@ -1,6 +1,8 @@ -package executor +package pod_info import ( + "fmt" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -21,10 +23,63 @@ type ClassAndPriority struct { type PodType string const ( - Throttle PodType = "Throttle" - Evict PodType = "Evict" + ThrottleDown PodType = "ThrottleDown" + ThrottleUp PodType = "ThrottleUp" + Evict PodType = "Evict" ) +type ContainerUsage struct { + ContainerName string + ContainerId string + Value float64 +} + +func GetUsageById(usages []ContainerUsage, containerId string) (ContainerUsage, error) { + for _, v := range usages { + if v.ContainerId == containerId { + return v, nil + } + } + + return ContainerUsage{}, fmt.Errorf("containerUsage not found") +} + +func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerUsage) { + var podUsage = 0.0 + var containerUsages []ContainerUsage + var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} + state, ok := stateMap[metricName] + if !ok { + return podUsage, containerUsages + } + for _, vv := range state { + var labelMaps = common.Labels2Maps(vv.Labels) + if utils.ContainMaps(labelMaps, podMaps) { + if labelMaps[common.LabelNameContainerId] == "" { + podUsage = vv.Samples[0].Value + } else { + containerUsages = append(containerUsages, ContainerUsage{ContainerId: labelMaps[common.LabelNameContainerId], + ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) + } + } + } + + return podUsage, containerUsages +} + +type CPURatio struct { + //the min of cpu ratio for pods + MinCPURatio uint64 `json:"minCPURatio,omitempty"` + + //the step of cpu share and limit for once down-size (1-100) + StepCPURatio uint64 `json:"stepCPURatio,omitempty"` +} + +type MemoryThrottleExecutor struct { + // to force gc the page cache of low level pods + ForceGC bool `json:"forceGC,omitempty"` +} + type PodContext struct { PodKey types.NamespacedName ClassAndPriority ClassAndPriority @@ -41,12 +96,32 @@ type PodContext struct { ExtCpuRequest int64 StartTime *metav1.Time - podType PodType + PodType PodType CPUThrottle CPURatio MemoryThrottle MemoryThrottleExecutor DeletionGracePeriodSeconds *int32 + + HasBeenActioned bool +} + +func HasNoExecutedPod(pods []PodContext) bool { + for _, p := range pods { + if p.HasBeenActioned == false { + return true + } + } + return false +} + +func GetFirstNoExecutedPod(pods []PodContext) int { + for index, p := range pods { + if p.HasBeenActioned == false { + return index + } + } + return -1 } func BuildPodBasicInfo(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction, podType PodType) PodContext { @@ -65,7 +140,7 @@ func BuildPodBasicInfo(pod *v1.Pod, stateMap map[string][]common.TimeSeries, act podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) - podContext.podType = podType + podContext.PodType = podType return podContext } diff --git a/pkg/ensurance/executor/release_resource.go b/pkg/ensurance/executor/release_resource.go new file mode 100644 index 000000000..a31c2b497 --- /dev/null +++ b/pkg/ensurance/executor/release_resource.go @@ -0,0 +1,41 @@ +package executor + +import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + +type ReleaseResource struct { + CpuUsage float64 + MemUsage float64 +} + +// TODO: add memory usage +func ConstructRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource { + if pod.PodType == podinfo.Evict { + return ReleaseResource{ + CpuUsage: pod.PodCPUUsage, + } + } + if pod.PodType == podinfo.ThrottleDown { + reduction := currentContainerCpuUsage - containerCPUQuotaNew + if reduction > 0 { + return ReleaseResource{ + CpuUsage: reduction, + } + } + return ReleaseResource{} + } + if pod.PodType == podinfo.ThrottleUp { + reduction := containerCPUQuotaNew - currentContainerCpuUsage + if reduction > 0 { + return ReleaseResource{ + CpuUsage: reduction, + } + } + return ReleaseResource{} + } + return ReleaseResource{} +} + +func (r *ReleaseResource) Add(new ReleaseResource) { + r.MemUsage += new.MemUsage + r.CpuUsage += new.CpuUsage +} diff --git a/pkg/ensurance/executor/schedule.go b/pkg/ensurance/executor/schedule.go index 9af6bb25d..663ad227e 100644 --- a/pkg/ensurance/executor/schedule.go +++ b/pkg/ensurance/executor/schedule.go @@ -6,6 +6,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -16,8 +17,8 @@ const ( ) type ScheduleExecutor struct { - DisableClassAndPriority *ClassAndPriority - RestoreClassAndPriority *ClassAndPriority + DisableClassAndPriority *podinfo.ClassAndPriority + RestoreClassAndPriority *podinfo.ClassAndPriority } func (b *ScheduleExecutor) Avoid(ctx *ExecuteContext) error { diff --git a/pkg/ensurance/analyzer/sort/cpu_metrics_sort.go b/pkg/ensurance/executor/sort/cpu_metrics_sort.go similarity index 72% rename from pkg/ensurance/analyzer/sort/cpu_metrics_sort.go rename to pkg/ensurance/executor/sort/cpu_metrics_sort.go index 2b932273b..8c61fdb72 100644 --- a/pkg/ensurance/analyzer/sort/cpu_metrics_sort.go +++ b/pkg/ensurance/executor/sort/cpu_metrics_sort.go @@ -1,17 +1,18 @@ package sort import ( - "github.com/gocrane/crane/pkg/ensurance/executor" - "github.com/gocrane/crane/pkg/utils" v1 "k8s.io/api/core/v1" + + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + "github.com/gocrane/crane/pkg/utils" ) -func CpuMetricsSorter(pods []executor.PodContext) { - orderedBy(classAndPriority, cpuUsage, runningTime).Sort(pods) +func CpuMetricsSorter(pods []podinfo.PodContext) { + orderedBy(classAndPriority, cpuUsage, extCpuUsage, runningTime).Sort(pods) } -// extCpuUsage compares the partition extcpu usage of extcpu limit -func extCpuUsage(p1, p2 executor.PodContext) int32 { +// extCpuUsage compares the partition of extcpu usage to extcpu limit +func extCpuUsage(p1, p2 podinfo.PodContext) int32 { // if both pod don't use ext resource, then return if p1.ExtCpuBeUsed == false && p2.ExtCpuBeUsed == false { return 0 @@ -24,7 +25,7 @@ func extCpuUsage(p1, p2 executor.PodContext) int32 { } // cpuUsage compares the partition extcpu usage of extcpu limit -func cpuUsage(p1, p2 executor.PodContext) int32 { +func cpuUsage(p1, p2 podinfo.PodContext) int32 { var p1usage, p2usage float64 // if both pod is PodQOSBestEffort, then compare the absolute usage;otherwise, cmpare the ratio compared with PodCPUQuota if p1.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort && p2.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort { @@ -38,7 +39,7 @@ func cpuUsage(p1, p2 executor.PodContext) int32 { } // extCpuBeUsed compares pod by using ext resource whether -func extCpuBeUsed(p1, p2 executor.PodContext) int32 { +func extCpuBeUsed(p1, p2 podinfo.PodContext) int32 { use1 := utils.TransBool2Uint(p1.ExtCpuBeUsed) use2 := utils.TransBool2Uint(p2.ExtCpuBeUsed) diff --git a/pkg/ensurance/analyzer/sort/mem_metrics_sort.go b/pkg/ensurance/executor/sort/mem_metrics_sort.go similarity index 51% rename from pkg/ensurance/analyzer/sort/mem_metrics_sort.go rename to pkg/ensurance/executor/sort/mem_metrics_sort.go index fac007da1..9fa01e742 100644 --- a/pkg/ensurance/analyzer/sort/mem_metrics_sort.go +++ b/pkg/ensurance/executor/sort/mem_metrics_sort.go @@ -1,9 +1,9 @@ package sort -import "github.com/gocrane/crane/pkg/ensurance/executor" +import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" // Todo: Memory metrics related sort func need to be filled -func MemMetricsSorter(pods []executor.PodContext) { +func MemMetricsSorter(pods []podinfo.PodContext) { orderedBy(classAndPriority, runningTime).Sort(pods) } diff --git a/pkg/ensurance/analyzer/sort/pod_sort.go b/pkg/ensurance/executor/sort/pod_sort.go similarity index 81% rename from pkg/ensurance/analyzer/sort/pod_sort.go rename to pkg/ensurance/executor/sort/pod_sort.go index 2146a99bd..fc805e73a 100644 --- a/pkg/ensurance/analyzer/sort/pod_sort.go +++ b/pkg/ensurance/executor/sort/pod_sort.go @@ -5,13 +5,13 @@ import ( "k8s.io/klog/v2" - "github.com/gocrane/crane/pkg/ensurance/executor" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" ) // RankFunc sorts the pods -type RankFunc func(pods []executor.PodContext) +type RankFunc func(pods []podinfo.PodContext) -var sortFunc = map[string]func(p1, p2 executor.PodContext) int32{ +var sortFunc = map[string]func(p1, p2 podinfo.PodContext) int32{ "ExtCpuBeUsed": extCpuBeUsed, "ClassAndPriority": classAndPriority, "ExtCpuUsage": extCpuUsage, @@ -40,7 +40,7 @@ func RankFuncConstruct(customize []string) RankFunc { } // runningTime compares pods by pod's start time -func runningTime(p1, p2 executor.PodContext) int32 { +func runningTime(p1, p2 podinfo.PodContext) int32 { t1 := p1.StartTime t2 := p2.StartTime @@ -61,8 +61,8 @@ func runningTime(p1, p2 executor.PodContext) int32 { } // classAndPriority compares pods by pod's ClassAndPriority -func classAndPriority(p1, p2 executor.PodContext) int32 { - return executor.CompareClassAndPriority(p1.ClassAndPriority, p2.ClassAndPriority) +func classAndPriority(p1, p2 podinfo.PodContext) int32 { + return podinfo.CompareClassAndPriority(p1.ClassAndPriority, p2.ClassAndPriority) } // Cmp compares p1 and p2 and returns: @@ -71,16 +71,16 @@ func classAndPriority(p1, p2 executor.PodContext) int32 { // 0 if p1 == p2 // +1 if p1 > p2 // -type cmpFunc func(p1, p2 executor.PodContext) int32 +type cmpFunc func(p1, p2 podinfo.PodContext) int32 // podSorter implements the Sort interface, sorting changes within. type podSorter struct { - pods []executor.PodContext + pods []podinfo.PodContext cmp []cmpFunc } // Sort sorts the argument slice according to the less functions passed to orderedBy. -func (ms *podSorter) Sort(pods []executor.PodContext) { +func (ms *podSorter) Sort(pods []podinfo.PodContext) { ms.pods = pods sort.Sort(ms) } diff --git a/pkg/ensurance/executor/throttle.go b/pkg/ensurance/executor/throttle.go index a907e2748..8fc740a07 100644 --- a/pkg/ensurance/executor/throttle.go +++ b/pkg/ensurance/executor/throttle.go @@ -9,7 +9,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - "github.com/gocrane/crane/pkg/common" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" @@ -25,9 +26,12 @@ const ( type ThrottleExecutor struct { ThrottleDownPods ThrottlePods ThrottleUpPods ThrottlePods + // All metrics(not only metrics that can be quantified) metioned in triggerd NodeQOSEnsurancePolicy and their corresponding waterlines + ThrottleDownWaterLine WaterLines + ThrottleUpWaterLine WaterLines } -type ThrottlePods []PodContext +type ThrottlePods []podinfo.PodContext func (t ThrottlePods) Find(podTypes types.NamespacedName) int { for i, v := range t { @@ -39,8 +43,8 @@ func (t ThrottlePods) Find(podTypes types.NamespacedName) int { return -1 } -func Reverse(t ThrottlePods) []PodContext { - throttle := []PodContext(t) +func Reverse(t ThrottlePods) []podinfo.PodContext { + throttle := []podinfo.PodContext(t) l := len(throttle) for i := 0; i < l/2; i++ { throttle[i], throttle[l-i-1] = throttle[l-i-1], throttle[i] @@ -48,35 +52,6 @@ func Reverse(t ThrottlePods) []PodContext { return throttle } -type CPURatio struct { - //the min of cpu ratio for pods - MinCPURatio uint64 `json:"minCPURatio,omitempty"` - - //the step of cpu share and limit for once down-size (1-100) - StepCPURatio uint64 `json:"stepCPURatio,omitempty"` -} - -type MemoryThrottleExecutor struct { - // to force gc the page cache of low level pods - ForceGC bool `json:"forceGC,omitempty"` -} - -type ContainerUsage struct { - ContainerName string - ContainerId string - Value float64 -} - -func GetUsageById(usages []ContainerUsage, containerId string) (ContainerUsage, error) { - for _, v := range usages { - if v.ContainerId == containerId { - return v, nil - } - } - - return ContainerUsage{}, fmt.Errorf("containerUsage not found") -} - func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { var start = time.Now() metrics.UpdateLastTimeWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepAvoid, start) @@ -91,88 +66,147 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { metrics.ExecutorStatusCounterInc(metrics.SubComponentThrottle, metrics.StepAvoid) } - var bSucceed = true - var errPodKeys []string + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + var totalReleased ReleaseResource + + /* The step to throttle: + 1. If ThrottleDownWaterLine has metrics that not in WaterLineMetricsCanBeQualified, throttle all ThrottleDownPods and calculate the release resource, then return + 2. If a metric which both in WaterLineMetricsCanBeQualified and ThrottleDownWaterLine doesn't has usage value in statemap, which means we can't get the + accurate value from collector, then throttle all ThrottleDownPods and return + 3. If there is gap to the metrics in WaterLineMetricsCanBeQualified, throttle finely according to the gap + 3.1 First sort pods by memory metrics, because it is incompressible more urgent; Then throttle sorted pods one by one util there is + no gap to waterline on memory usage + 3.2 Then sort pods by cpu metrics, Then throttle sorted pods one by one util there is no gap to waterline on cpu usage + */ + + // If there is a metric that can't be qualified, then we throttle all selected throttlePods and return + if t.ThrottleDownWaterLine.HasMetricNotInWaterLineMetrics() { + errPodKeys = t.throttlePods(ctx, &totalReleased) + } else { + ctx.ThrottoleDownGapToWaterLines, _, _ = buildGapToWaterLine(ctx.getStateFunc(), *t, EvictExecutor{}) + if ctx.ThrottoleDownGapToWaterLines.HasUsageMissedMetric() { + // If there is metric in ThrottoleDownGapToWaterLines(get from trigger NodeQOSEnsurancePolicy) that can't get current usage, + // we have to throttle all selected ThrottleDownPods + errPodKeys = t.throttlePods(ctx, &totalReleased) + } else { + // The metrics in ThrottoleDownGapToWaterLines are all in WaterLineMetricsCanBeQualified and has current usage, then throttle precisely + var released ReleaseResource + + // First throttle pods according to incompressible resource: memory + execsort.MemMetricsSorter(t.ThrottleDownPods) + for !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(MemUsage) { + if podinfo.HasNoExecutedPod(t.ThrottleDownPods) { + index := podinfo.GetFirstNoExecutedPod(t.ThrottleDownPods) + errKeys, released = t.throttleOnePod(ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.ThrottoleDownGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.ThrottoleDownGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } + } + // Then throttle pods according to compressible resource: cpu + execsort.CpuMetricsSorter(t.ThrottleDownPods) + for !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(CpuUsage) { + if podinfo.HasNoExecutedPod(t.ThrottleDownPods) { + index := podinfo.GetFirstNoExecutedPod(t.ThrottleDownPods) + errKeys, released = t.throttleOnePod(ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.ThrottoleDownGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.ThrottoleDownGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } + } + } + } + + if len(errPodKeys) != 0 { + return fmt.Errorf("some pod throttle failed,err: %s", strings.Join(errPodKeys, ";")) + } - for _, throttlePod := range t.ThrottleDownPods { + return nil +} - pod, err := ctx.PodLister.Pods(throttlePod.PodKey.Namespace).Get(throttlePod.PodKey.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", throttlePod.PodKey.String())) +func (t *ThrottleExecutor) throttlePods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource) (errPodKeys []string) { + for i := range t.ThrottleDownPods { + errKeys, _ := t.throttleOnePod(ctx, i, totalReleasedResource) + errPodKeys = append(errPodKeys, errKeys...) + } + return +} + +func (t *ThrottleExecutor) throttleOnePod(ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { + pod, err := ctx.PodLister.Pods(t.ThrottleDownPods[index].PodKey.Namespace).Get(t.ThrottleDownPods[index].PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", t.ThrottleDownPods[index].PodKey.String())) + return + } + + // Throttle for CPU metrics + + for _, v := range t.ThrottleDownPods[index].ContainerCPUUsages { + // pause container to skip + if v.ContainerName == "" { continue } - for _, v := range throttlePod.ContainerCPUUsages { - // pause container to skip - if v.ContainerName == "" { - continue - } - - klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName) + klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName) - containerCPUQuota, err := GetUsageById(throttlePod.ContainerCPUQuotas, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue - } + containerCPUQuota, err := podinfo.GetUsageById(t.ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleDownPods[index].PodKey.String()) + continue + } - containerCPUPeriod, err := GetUsageById(throttlePod.ContainerCPUPeriods, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue - } + containerCPUPeriod, err := podinfo.GetUsageById(t.ThrottleDownPods[index].ContainerCPUPeriods, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleDownPods[index].PodKey.String()) + continue + } - container, err := utils.GetPodContainerByName(pod, v.ContainerName) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue - } + container, err := utils.GetPodContainerByName(pod, v.ContainerName) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleDownPods[index].PodKey.String()) + continue + } - var containerCPUQuotaNew float64 - if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { - containerCPUQuotaNew = v.Value * (1.0 - float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) - } else { - containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 - float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) - } + var containerCPUQuotaNew float64 + if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { + containerCPUQuotaNew = v.Value * (1.0 - float64(t.ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } else { + containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 - float64(t.ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } - if requestCPU, ok := container.Resources.Requests[v1.ResourceCPU]; ok { - if float64(requestCPU.MilliValue())/CpuQuotaCoefficient > containerCPUQuotaNew { - containerCPUQuotaNew = float64(requestCPU.MilliValue()) / CpuQuotaCoefficient - } + if requestCPU, ok := container.Resources.Requests[v1.ResourceCPU]; ok { + if float64(requestCPU.MilliValue())/CpuQuotaCoefficient > containerCPUQuotaNew { + containerCPUQuotaNew = float64(requestCPU.MilliValue()) / CpuQuotaCoefficient } + } - if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { - if float64(limitCPU.MilliValue())/CpuQuotaCoefficient*float64(throttlePod.CPUThrottle.MinCPURatio)/MaxRatio > containerCPUQuotaNew { - containerCPUQuotaNew = float64(limitCPU.MilliValue()) * float64(throttlePod.CPUThrottle.MinCPURatio) / CpuQuotaCoefficient - } + if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { + if float64(limitCPU.MilliValue())/CpuQuotaCoefficient*float64(t.ThrottleDownPods[index].CPUThrottle.MinCPURatio)/MaxRatio > containerCPUQuotaNew { + containerCPUQuotaNew = float64(limitCPU.MilliValue()) * float64(t.ThrottleDownPods[index].CPUThrottle.MinCPURatio) / CpuQuotaCoefficient } + } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f", - containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f", + containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) - if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) - if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", throttlePod.PodKey.String(), v.ContainerName, err)) - bSucceed = false - continue - } else { - klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", - klog.KObj(pod), v.ContainerName, containerCPUQuotaNew) - } + if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", t.ThrottleDownPods[index].PodKey.String(), v.ContainerName, err)) + continue + } else { + klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", + klog.KObj(pod), v.ContainerName, containerCPUQuotaNew) } } + released = ConstructRelease(t.ThrottleDownPods[index], containerCPUQuotaNew, v.Value) + totalReleasedResource.Add(released) } + return - if !bSucceed { - return fmt.Errorf("some pod throttle failed,err: %s", strings.Join(errPodKeys, ";")) - } + //TODO: Throttle Others,such as memroy... - return nil } func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { @@ -182,7 +216,7 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { klog.V(6).Info("ThrottleExecutor restore, %v", *t) - if len(t.ThrottleDownPods) == 0 { + if len(t.ThrottleUpPods) == 0 { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepRestore, 0) return nil } @@ -190,121 +224,159 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepRestore, 1.0) metrics.ExecutorStatusCounterInc(metrics.SubComponentThrottle, metrics.StepRestore) - var bSucceed = true - var errPodKeys []string + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + var totalReleased ReleaseResource + + /* The step to restore: + 1. If ThrottleUpWaterLine has metrics that not in WaterLineMetricsCanBeQualified, restore all ThrottleUpPods and calculate the difference of resource, then return + 2. If a metric which both in WaterLineMetricsCanBeQualified and ThrottleUpWaterLine doesn't has usage value in statemap, which means we can't get the + accurate value from collector, then restore all ThrottleUpPods and return + 3. If there is gap to the metrics in WaterLineMetricsCanBeQualified, restore finely according to the gap + 3.1 First sort pods by memory metrics, because it is incompressible more urgent; Then restore sorted pods one by one util there is + no gap to waterline on memory usage + 3.2 Then sort pods by cpu metrics, Then restore sorted pods one by one util there is no gap to waterline on cpu usage + */ + + // If there is a metric that can't be qualified, then we do evict for all selected evictedPod and return, because all evictedPod has been evicted + if t.ThrottleUpWaterLine.HasMetricNotInWaterLineMetrics() { + errPodKeys = t.restorePods(ctx, &totalReleased) + } else { + _, ctx.ThrottoleUpGapToWaterLines, _ = buildGapToWaterLine(ctx.getStateFunc(), *t, EvictExecutor{}) + if ctx.ThrottoleUpGapToWaterLines.HasUsageMissedMetric() { + // If there is metric in ThrottoleUpGapToWaterLines(get from trigger NodeQOSEnsurancePolicy) that can't get current usage, + // we have to do restore for all selected ThrottleUpPods + errPodKeys = t.restorePods(ctx, &totalReleased) + } else { + // The metrics in ThrottoleUpGapToWaterLines are all in WaterLineMetricsCanBeQualified and has current usage,we can do restore precisely + var released ReleaseResource + + // We first restore for incompressible resource: memory + execsort.MemMetricsSorter(t.ThrottleUpPods) + t.ThrottleUpPods = Reverse(t.ThrottleUpPods) + for !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(MemUsage) { + if podinfo.HasNoExecutedPod(t.ThrottleUpPods) { + index := podinfo.GetFirstNoExecutedPod(t.ThrottleUpPods) + errKeys, released = t.restoreOnePod(ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.ThrottoleUpGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.ThrottoleUpGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } + } + // Then restore for compressible resource: cpu + execsort.CpuMetricsSorter(t.ThrottleUpPods) + t.ThrottleUpPods = Reverse(t.ThrottleUpPods) + for !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(CpuUsage) { + if podinfo.HasNoExecutedPod(t.ThrottleUpPods) { + index := podinfo.GetFirstNoExecutedPod(t.ThrottleUpPods) + errKeys, released = t.restoreOnePod(ctx, index, &totalReleased) + errPodKeys = append(errPodKeys, errKeys...) + ctx.ThrottoleUpGapToWaterLines[string(MemUsage)] -= released.MemUsage + ctx.ThrottoleUpGapToWaterLines[string(CpuUsage)] -= released.CpuUsage + } + } + } + } - for _, throttlePod := range t.ThrottleUpPods { + if len(errPodKeys) != 0 { + return fmt.Errorf("some pod throttle restore failed,err: %s", strings.Join(errPodKeys, ";")) + } - pod, err := ctx.PodLister.Pods(throttlePod.PodKey.Namespace).Get(throttlePod.PodKey.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "not found ", throttlePod.PodKey.String()) + return nil +} + +func (t *ThrottleExecutor) restorePods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource) (errPodKeys []string) { + for i := range t.ThrottleUpPods { + errKeys, _ := t.restoreOnePod(ctx, i, totalReleasedResource) + errPodKeys = append(errPodKeys, errKeys...) + } + return +} + +func (t *ThrottleExecutor) restoreOnePod(ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { + pod, err := ctx.PodLister.Pods(t.ThrottleUpPods[index].PodKey.Namespace).Get(t.ThrottleUpPods[index].PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, "not found ", t.ThrottleUpPods[index].PodKey.String()) + return + } + + // Restore for CPU metric + for _, v := range t.ThrottleUpPods[index].ContainerCPUUsages { + + // pause container to skip + if v.ContainerName == "" { continue } - for _, v := range throttlePod.ContainerCPUUsages { + klog.V(6).Infof("ThrottleExecutor restore container %s/%s", klog.KObj(pod), v.ContainerName) - // pause container to skip - if v.ContainerName == "" { - continue - } + containerCPUQuota, err := podinfo.GetUsageById(t.ThrottleUpPods[index].ContainerCPUQuotas, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleUpPods[index].PodKey.String()) + continue + } - klog.V(6).Infof("ThrottleExecutor1 restore container %s/%s", klog.KObj(pod), v.ContainerName) + containerCPUPeriod, err := podinfo.GetUsageById(t.ThrottleUpPods[index].ContainerCPUPeriods, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleUpPods[index].PodKey.String()) + continue + } - containerCPUQuota, err := GetUsageById(throttlePod.ContainerCPUQuotas, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue - } + container, err := utils.GetPodContainerByName(pod, v.ContainerName) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), t.ThrottleUpPods[index].PodKey.String()) + continue + } - containerCPUPeriod, err := GetUsageById(throttlePod.ContainerCPUPeriods, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue - } + var containerCPUQuotaNew float64 + if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { + continue + } else { + containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 + float64(t.ThrottleUpPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } - container, err := utils.GetPodContainerByName(pod, v.ContainerName) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodKey.String()) - continue + if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { + if float64(limitCPU.MilliValue())/CpuQuotaCoefficient < containerCPUQuotaNew { + containerCPUQuotaNew = float64(limitCPU.MilliValue()) / CpuQuotaCoefficient } - - var containerCPUQuotaNew float64 - if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { - continue - } else { - containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 + float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) + } else { + usage, hasExtRes := utils.GetExtCpuRes(container) + if hasExtRes { + containerCPUQuotaNew = float64(usage.MilliValue()) / CpuQuotaCoefficient } - - if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { - if float64(limitCPU.MilliValue())/CpuQuotaCoefficient < containerCPUQuotaNew { - containerCPUQuotaNew = float64(limitCPU.MilliValue()) / CpuQuotaCoefficient - } - } else { - usage, hasExtRes := utils.GetExtCpuRes(container) - if hasExtRes { - containerCPUQuotaNew = float64(usage.MilliValue()) / CpuQuotaCoefficient - } - if !hasExtRes && containerCPUQuotaNew > MaxUpQuota*containerCPUPeriod.Value/CpuQuotaCoefficient { - containerCPUQuotaNew = -1 - } - + if !hasExtRes && containerCPUQuotaNew > MaxUpQuota*containerCPUPeriod.Value/CpuQuotaCoefficient { + containerCPUQuotaNew = -1 } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f", - klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) - - if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { - - if utils.AlmostEqual(containerCPUQuotaNew, -1) { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)}) - if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), throttlePod.PodKey.String()) - bSucceed = false - continue - } - } else { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) - if err != nil { - klog.Errorf("Failed to updateResource, err %s", err.Error()) - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), throttlePod.PodKey.String()) - bSucceed = false - continue - } - } - } } - } - if !bSucceed { - return fmt.Errorf("some pod throttle restore failed,err: %s", strings.Join(errPodKeys, ";")) - } + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f", + klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) - return nil -} + if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { -func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerUsage) { - var podUsage = 0.0 - var containerUsages []ContainerUsage - var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} - state, ok := stateMap[metricName] - if !ok { - return podUsage, containerUsages - } - for _, vv := range state { - var labelMaps = common.Labels2Maps(vv.Labels) - if utils.ContainMaps(labelMaps, podMaps) { - if labelMaps[common.LabelNameContainerId] == "" { - podUsage = vv.Samples[0].Value + if utils.AlmostEqual(containerCPUQuotaNew, -1) { + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)}) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), t.ThrottleUpPods[index].PodKey.String()) + continue + } } else { - containerUsages = append(containerUsages, ContainerUsage{ContainerId: labelMaps[common.LabelNameContainerId], - ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) + if err != nil { + klog.Errorf("Failed to updateResource, err %s", err.Error()) + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), t.ThrottleUpPods[index].PodKey.String()) + continue + } } } + released = ConstructRelease(t.ThrottleUpPods[index], containerCPUQuotaNew, v.Value) + totalReleasedResource.Add(released) + + t.ThrottleUpPods[index].HasBeenActioned = true } - return podUsage, containerUsages + // Restore other resource, such as memory + + return } diff --git a/pkg/ensurance/executor/waterline.go b/pkg/ensurance/executor/waterline.go new file mode 100644 index 000000000..ff4654047 --- /dev/null +++ b/pkg/ensurance/executor/waterline.go @@ -0,0 +1,186 @@ +package executor + +import ( + "math" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + + "github.com/gocrane/crane/pkg/common" + "github.com/gocrane/crane/pkg/ensurance/collector/types" +) + +// Metrics that can be measured for waterLine +type WaterLineMetric string + +// Be consistent with metrics in collector/types/types.go +const ( + CpuUsage = WaterLineMetric(types.MetricNameCpuTotalUsage) + MemUsage = WaterLineMetric(types.MetricNameMemoryTotalUsage) +) + +const ( + // We can't get current use, so can't do actions precisely, just evict every evictedPod + MissedCurrentUsage float64 = math.MaxFloat64 +) + +var WaterLineMetricsCanBeQualified = [...]WaterLineMetric{CpuUsage, MemUsage} + +// An WaterLine is a min-heap of Quantity. The values come from each objectiveEnsurance.metricRule.value +type WaterLine []resource.Quantity + +func (w WaterLine) Len() int { + return len(w) +} + +func (w WaterLine) Swap(i, j int) { + w[i], w[j] = w[j], w[i] +} + +func (w *WaterLine) Push(x interface{}) { + *w = append(*w, x.(resource.Quantity)) +} + +func (w *WaterLine) Pop() interface{} { + old := *w + n := len(old) + x := old[n-1] + *w = old[0 : n-1] + return x +} + +func (w *WaterLine) PopSmallest() *resource.Quantity { + wl := *w + return &wl[0] +} + +func (w WaterLine) Less(i, j int) bool { + cmp := w[i].Cmp(w[j]) + if cmp == -1 { + return true + } + return false +} + +func (w WaterLine) String() string { + str := "" + for i := 0; i < w.Len(); i++ { + str += w[i].String() + str += " " + } + return str +} + +// Key is metric name, value is the difference between usage and the smallest waterline +type GapToWaterLines map[string]float64 + +// Only calculate gap for metrics that can be quantified +func buildGapToWaterLine(stateMap map[string][]common.TimeSeries, + throttleExecutor ThrottleExecutor, evictExecutor EvictExecutor) ( + throttleDownGapToWaterLines, throttleUpGapToWaterLines, eviceGapToWaterLines GapToWaterLines) { + + //// Update stateMap + //ctx.stateMap = stateMap + throttleDownGapToWaterLines, throttleUpGapToWaterLines, eviceGapToWaterLines = make(map[string]float64), make(map[string]float64), make(map[string]float64) + + for _, m := range WaterLineMetricsCanBeQualified { + // Get the series for each metric + series, ok := stateMap[string(m)] + if !ok { + klog.Errorf("metric %s not found from collector stateMap", string(m)) + // Can't get current usage, so can not do actions precisely, just evict every evictedPod; + throttleDownGapToWaterLines[string(m)] = MissedCurrentUsage + throttleUpGapToWaterLines[string(m)] = MissedCurrentUsage + eviceGapToWaterLines[string(m)] = MissedCurrentUsage + continue + } + + // Find the biggest used value + var maxUsed float64 + for _, ts := range series { + if ts.Samples[0].Value > maxUsed { + maxUsed = ts.Samples[0].Value + } + } + + // Get the waterLine for each metric in WaterLineMetricsCanBeQualified + throttleDownWaterLine, throttleDownExist := throttleExecutor.ThrottleDownWaterLine[string(m)] + throttleUpWaterLine, throttleUpExist := throttleExecutor.ThrottleUpWaterLine[string(m)] + evictWaterLine, evictExist := evictExecutor.EvictWaterLine[string(m)] + + // If a metric does not exist in ThrottleDownWaterLine, throttleDownGapToWaterLines of this metric will can't be calculated + if !throttleDownExist { + delete(throttleDownGapToWaterLines, string(m)) + } else { + throttleDownGapToWaterLines[string(m)] = maxUsed - float64(throttleDownWaterLine.PopSmallest().Value()) + } + + // If metric not exist in ThrottleUpWaterLine, throttleUpGapToWaterLines of metric will can't be calculated + if !throttleUpExist { + delete(throttleUpGapToWaterLines, string(m)) + } else { + // Attention: different with throttleDown and evict + throttleUpGapToWaterLines[string(m)] = float64(throttleUpWaterLine.PopSmallest().Value()) - maxUsed + } + + // If metric not exist in EvictWaterLine, eviceGapToWaterLines of metric will can't be calculated + if !evictExist { + delete(eviceGapToWaterLines, string(m)) + } else { + eviceGapToWaterLines[string(m)] = maxUsed - float64(evictWaterLine.PopSmallest().Value()) + } + } + return +} + +// Whether no gaps in GapToWaterLines +func (g GapToWaterLines) GapsAllRemoved() bool { + for _, v := range g { + if v > 0 { + return false + } + } + return true +} + +// For a specified metric in GapToWaterLines, whether there still has gap +func (g GapToWaterLines) TargetGapsRemoved(metric WaterLineMetric) bool { + val, ok := g[string(metric)] + if !ok { + return true + } + if val <= 0 { + return true + } + return false +} + +// Whether there is a metric that can't get usage in GapToWaterLines +func (g GapToWaterLines) HasUsageMissedMetric() bool { + for _, v := range g { + if v == MissedCurrentUsage { + return true + } + } + return false +} + +// Key is the metric name, values are waterlines which get from each objectiveEnsurance.metricRule.value +type WaterLines map[string]*WaterLine + +// HasMetricNotInWaterLineMetrics judges that if there are metrics in WaterLines e that not exist in WaterLineMetricsCanBeQualified +func (e WaterLines) HasMetricNotInWaterLineMetrics() bool { + for metric := range e { + var existInWaterLineMetrics = false + for _, v := range WaterLineMetricsCanBeQualified { + if metric == string(v) { + existInWaterLineMetrics = true + break + } + } + if !existInWaterLineMetrics { + return true + } + } + return false +} diff --git a/pkg/ensurance/executor/waterline_test.go b/pkg/ensurance/executor/waterline_test.go new file mode 100644 index 000000000..126ac3f09 --- /dev/null +++ b/pkg/ensurance/executor/waterline_test.go @@ -0,0 +1,46 @@ +package executor + +import ( + "container/heap" + "strconv" + "testing" + + "k8s.io/apimachinery/pkg/api/resource" +) + +func (w WaterLine) verify(t *testing.T, i int) { + t.Helper() + n := w.Len() + j1 := 2*i + 1 + j2 := 2*i + 2 + if j1 < n { + if w.Less(j1, i) { + t.Errorf("heap invariant invalidated [%d] = %d > [%d] = %d", i, w[i].Value(), j1, w[j1].Value()) + return + } + w.verify(t, j1) + } + if j2 < n { + if w.Less(j2, i) { + t.Errorf("heap invariant invalidated [%d] = %d > [%d] = %d", i, w[i].Value(), j1, w[j2].Value()) + return + } + w.verify(t, j2) + } +} + +func TestSort(t *testing.T) { + h := WaterLine{} + + for i := 20; i > 0; i-- { + heap.Push(&h, resource.MustParse(strconv.Itoa(i)+"m")) + } + + h.verify(t, 0) + + t.Logf("watetline is %s", h.String()) + for i := 1; h.Len() > 0; i++ { + heap.Pop(&h) + h.verify(t, 0) + } +} diff --git a/pkg/resource/pod_resource_manger.go b/pkg/resource/pod_resource_manger.go index 87e654115..03dd07d00 100644 --- a/pkg/resource/pod_resource_manger.go +++ b/pkg/resource/pod_resource_manger.go @@ -20,6 +20,7 @@ import ( "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" "github.com/gocrane/crane/pkg/ensurance/executor" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" cgrpc "github.com/gocrane/crane/pkg/ensurance/grpc" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/known" @@ -147,7 +148,7 @@ func (o *PodResourceManager) updatePodExtResToCgroup(pod *v1.Pod) { start := time.Now() metrics.UpdateLastTime(string(known.ModulePodResourceManager), metrics.StepUpdatePodResource, start) - _, containerCPUQuotas := executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), o.state, pod) + _, containerCPUQuotas := podinfo.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), o.state, pod) for _, c := range pod.Spec.Containers { if state := utils.GetContainerStatus(pod, c); state.Running == nil { @@ -163,7 +164,7 @@ func (o *PodResourceManager) updatePodExtResToCgroup(pod *v1.Pod) { } // If container's quota is -1, pod resource manager will convert limit to quota - containerCPUQuota, err := executor.GetUsageById(containerCPUQuotas, containerId) + containerCPUQuota, err := podinfo.GetUsageById(containerCPUQuotas, containerId) if err != nil { klog.Error(err) } @@ -195,7 +196,7 @@ func (o *PodResourceManager) getCPUPeriod(pod *v1.Pod, containerId string) float now := time.Now() if o.state != nil && !now.After(o.lastStateTime.Add(StateExpiration)) { - _, containerCPUPeriods := executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), o.state, pod) + _, containerCPUPeriods := podinfo.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), o.state, pod) for _, period := range containerCPUPeriods { if period.ContainerId == containerId { return period.Value