diff --git a/cmd/crane-agent/app/agent.go b/cmd/crane-agent/app/agent.go index 8fd1da2b7..840007945 100644 --- a/cmd/crane-agent/app/agent.go +++ b/cmd/crane-agent/app/agent.go @@ -99,8 +99,8 @@ func Run(ctx context.Context, opts *options.Options) error { actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions() tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions() - newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient, - podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval) + newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient, podInformer, nodeInformer, + nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess) if err != nil { return err diff --git a/cmd/crane-agent/app/options/option.go b/cmd/crane-agent/app/options/option.go index 205a40d97..830873bc1 100644 --- a/cmd/crane-agent/app/options/option.go +++ b/cmd/crane-agent/app/options/option.go @@ -26,6 +26,8 @@ type Options struct { // Ifaces is the network devices to collect metric Ifaces []string NodeResourceReserved map[string]string + // ExecuteExcess is the percentage of executions that exceed the gap between current usage and waterlines + ExecuteExcess string } // NewOptions builds an empty options. @@ -54,4 +56,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0") flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)") flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min") + flags.StringVar(&o.ExecuteExcess, "execute-excess", "10%", "The percentage of executions that exceed the gap between current usage and waterlines, default: 10%.") } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 237faba04..3af477e15 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -65,6 +65,7 @@ func NewAgent(ctx context.Context, ifaces []string, healthCheck *metrics.HealthCheck, CollectInterval time.Duration, + executeExcess string, ) (*Agent, error) { var managers []manager.Manager var noticeCh = make(chan executor.AvoidanceExecutor) @@ -88,8 +89,9 @@ func NewAgent(ctx context.Context, managers = appendManagerIfNotNil(managers, stateCollector) analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh) managers = appendManagerIfNotNil(managers, analyzerManager) - avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint) + avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess) managers = appendManagerIfNotNil(managers, avoidanceManager) + if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { tspName, err := agent.CreateNodeResourceTsp() if err != nil { diff --git a/pkg/ensurance/analyzer/analyzer.go b/pkg/ensurance/analyzer/analyzer.go index 9928078e0..96727ae69 100644 --- a/pkg/ensurance/analyzer/analyzer.go +++ b/pkg/ensurance/analyzer/analyzer.go @@ -1,15 +1,14 @@ package analyzer import ( + "container/heap" "fmt" - "sort" "strings" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -25,8 +24,8 @@ import ( "github.com/gocrane/crane/pkg/common" "github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator" ecache "github.com/gocrane/crane/pkg/ensurance/cache" - stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" "github.com/gocrane/crane/pkg/ensurance/executor" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -74,6 +73,7 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset, eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "crane-agent"}) + return &AnormalyAnalyzer{ nodeName: nodeName, evaluator: expressionEvaluator, @@ -180,7 +180,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) { } } - klog.V(6).Infof("Analyze actionContexts: %v", actionContexts) + klog.V(6).Infof("Analyze actionContexts: %#v", actionContexts) //step 3 : merge avoidanceAction := s.merge(state, avoidanceMaps, actionContexts) @@ -189,7 +189,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) { return } - //step 4 :notice the enforcer manager + //step 4 : notice the enforcer manager s.notify(avoidanceAction) return @@ -208,7 +208,7 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea for _, ts := range series { triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value) - klog.V(6).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered, + klog.V(4).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered, object.MetricRule.Name, ts.Samples[0].Value, common.GetValueByName(ts.Labels, common.LabelNamePodNamespace), @@ -238,7 +238,7 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu //step2: check if triggered for NodeQOSEnsurance threshold := s.trigger(series, object) - klog.V(4).Infof("for NodeQOS %s, metrics reach the threshold: %v", key, threshold) + klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold) //step3: check is triggered action or restored, set the detection s.computeActionContext(threshold, key, object, &ac) @@ -297,24 +297,26 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida //step3 get and deduplicate throttlePods, throttleUpPods if action.Spec.Throttle != nil { throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap) + + // combine the throttle waterline + combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule) // combine the replicated pod combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods) } //step4 get and deduplicate evictPods if action.Spec.Eviction != nil { - evictPods := s.getEvictPods(ac.Triggered, action) + evictPods := s.getEvictPods(ac.Triggered, action, stateMap) + + // combine the evict waterline + combineEvictWaterLine(&ae.EvictExecutor, ac) // combine the replicated pod combineEvictDuplicate(&ae.EvictExecutor, evictPods) } } + ae.StateMap = stateMap - // sort the throttle executor by pod qos priority - sort.Sort(ae.ThrottleExecutor.ThrottleDownPods) - sort.Sort(sort.Reverse(ae.ThrottleExecutor.ThrottleUpPods)) - - // sort the evict executor by pod qos priority - sort.Sort(ae.EvictExecutor.EvictPods) + klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor) return ae } @@ -367,9 +369,7 @@ func (s *AnormalyAnalyzer) getTimeSeriesFromMap(state []common.TimeSeries, selec } func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) { - //step1: check need to notice enforcer manager - - //step2: notice by channel + //step1: notice by channel s.actionCh <- as return } @@ -389,9 +389,13 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool { } func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext, - action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]executor.ThrottlePod, []executor.ThrottlePod) { + action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) { + + throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{} - throttlePods, throttleUpPods := []executor.ThrottlePod{}, []executor.ThrottlePod{} + if !ac.Triggered && !(enableSchedule && ac.Restored) { + return throttlePods, throttleUpPods + } allPods, err := s.podLister.List(labels.Everything()) if err != nil { @@ -401,18 +405,18 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action for _, pod := range allPods { if ac.Triggered { - throttlePods = append(throttlePods, throttlePodConstruct(pod, stateMap, action)) + throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown)) } if enableSchedule && ac.Restored { - throttleUpPods = append(throttleUpPods, throttlePodConstruct(pod, stateMap, action)) + throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp)) } } return throttlePods, throttleUpPods } -func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction) []executor.EvictPod { - evictPods := []executor.EvictPod{} +func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []podinfo.PodContext { + evictPods := []podinfo.PodContext{} if triggered { allPods, err := s.podLister.List(labels.Everything()) @@ -421,10 +425,8 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo return evictPods } - for _, v := range allPods { - var classAndPriority = executor.ClassAndPriority{PodQOSClass: v.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(v.Spec.Priority, 0)} - evictPods = append(evictPods, executor.EvictPod{DeletionGracePeriodSeconds: action.Spec.Eviction.TerminationGracePeriodSeconds, - PodKey: types.NamespacedName{Name: v.Name, Namespace: v.Namespace}, ClassAndPriority: classAndPriority}) + for _, pod := range allPods { + evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict)) } } return evictPods @@ -435,7 +437,7 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon // If any rules are triggered, the avoidance is true,otherwise the avoidance is false. // If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false. - // If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false. + // If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false. var enableScheduling, avoidance, restore bool defer func() { @@ -472,36 +474,19 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon if avoidance { s.lastTriggeredTime = now - ae.ScheduleExecutor.DisableClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} + ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} } if enableScheduling { - ae.ScheduleExecutor.RestoreClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} + ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} } return enableScheduling } -func throttlePodConstruct(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction) executor.ThrottlePod { - var throttlePod executor.ThrottlePod - var qosPriority = executor.ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)} - - throttlePod.PodTypes = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} - throttlePod.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) - throttlePod.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) - - throttlePod.PodCPUUsage, throttlePod.ContainerCPUUsages = executor.GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod) - throttlePod.PodCPUShare, throttlePod.ContainerCPUShares = executor.GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod) - throttlePod.PodCPUQuota, throttlePod.ContainerCPUQuotas = executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod) - throttlePod.PodCPUPeriod, throttlePod.ContainerCPUPeriods = executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod) - throttlePod.PodQOSPriority = qosPriority - - return throttlePod -} - func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) { for _, t := range throttlePods { - if i := e.ThrottleDownPods.Find(t.PodTypes); i == -1 { + if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 { e.ThrottleDownPods = append(e.ThrottleDownPods, t) } else { if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio { @@ -513,9 +498,9 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott } } } - for _, t := range throttleUpPods { - if i := e.ThrottleUpPods.Find(t.PodTypes); i == -1 { + for _, t := range throttleUpPods { + if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 { e.ThrottleUpPods = append(e.ThrottleUpPods, t) } else { if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio { @@ -541,3 +526,68 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo } } } + +func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) { + if !ac.Triggered && !(enableSchedule && ac.Restored) { + return + } + + if ac.Triggered { + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.ThrottleDownWaterLine == nil { + e.ThrottleDownWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine) + } + // Use a heap here, so we don't need to use - as value, just use + if e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil { + e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{} + } + heap.Push(e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value) + } + } + } + + for waterLineMetric, waterlines := range e.ThrottleDownWaterLine { + klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } + + if enableSchedule && ac.Restored { + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.ThrottleUpWaterLine == nil { + e.ThrottleUpWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine) + } + if e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil { + e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{} + } + heap.Push(e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value) + } + } + } + + for waterLineMetric, waterlines := range e.ThrottleUpWaterLine { + klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } +} + +func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) { + if !ac.Triggered { + return + } + + for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { + if ensurance.Name == ac.ObjectiveEnsuranceName { + if e.EvictWaterLine == nil { + e.EvictWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine) + } + if e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil { + e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{} + } + heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value) + } + } + + for waterLineMetric, waterlines := range e.EvictWaterLine { + klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } +} diff --git a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go index 2c3fe7805..ad97e844b 100644 --- a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go +++ b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go @@ -4,7 +4,9 @@ package cadvisor import ( + "math" "net/http" + "reflect" "strconv" "time" @@ -127,6 +129,16 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { for key, v := range containers { containerId := utils.GetContainerIdFromKey(key) containerName := utils.GetContainerNameFromPod(pod, containerId) + klog.V(6).Infof("Key is %s, containerId is %s, containerName is %s", key, containerId, containerName) + + if reflect.DeepEqual(cadvisorapiv2.ContainerInfo{}, v) { + klog.Warning("ContainerInfo is nil") + } else { + if len(v.Stats) == 0 { + klog.Warning("ContainerInfo.Stats is empty") + } + } + // Filter the sandbox container if (containerId != "") && (containerName == "") { continue @@ -145,10 +157,11 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { } if state, ok := c.latestContainersStates[key]; ok { + klog.V(8).Infof("For key %s, LatestContainersStates exist", key) var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes) cpuUsageSample, schedRunqueueTime := caculateCPUUsage(&v, &state) - if cpuUsageSample == 0 && schedRunqueueTime == 0 { + if cpuUsageSample == 0 && schedRunqueueTime == 0 || math.IsNaN(cpuUsageSample) { continue } if hasExtRes { @@ -160,7 +173,7 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { addSampleToStateMap(types.MetricNameContainerCpuQuota, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Quota), now), stateMap) addSampleToStateMap(types.MetricNameContainerCpuPeriod, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Period), now), stateMap) - klog.V(10).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f", klog.KObj(pod), containerName, key, schedRunqueueTime) + klog.V(8).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f, container_cpu_total_usage %#v", klog.KObj(pod), containerName, key, schedRunqueueTime, cpuUsageSample) } containerStates[key] = ContainerState{stat: v, timestamp: now} } @@ -202,6 +215,7 @@ func caculateCPUUsage(info *cadvisorapiv2.ContainerInfo, state *ContainerState) cpuUsageIncrease := info.Stats[0].Cpu.Usage.Total - state.stat.Stats[0].Cpu.Usage.Total schedRunqueueTimeIncrease := info.Stats[0].Cpu.Schedstat.RunqueueTime - state.stat.Stats[0].Cpu.Schedstat.RunqueueTime timeIncrease := info.Stats[0].Timestamp.UnixNano() - state.stat.Stats[0].Timestamp.UnixNano() + cpuUsageSample := float64(cpuUsageIncrease) / float64(timeIncrease) schedRunqueueTime := float64(schedRunqueueTimeIncrease) * 1000 * 1000 / float64(timeIncrease) return cpuUsageSample, schedRunqueueTime diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index b7967e65c..e21024d0d 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -36,6 +36,8 @@ type StateCollector struct { AnalyzerChann chan map[string][]common.TimeSeries NodeResourceChann chan map[string][]common.TimeSeries PodResourceChann chan map[string][]common.TimeSeries + State map[string][]common.TimeSeries + rw sync.RWMutex } func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsurancePolicyLister, podLister corelisters.PodLister, @@ -43,6 +45,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura analyzerChann := make(chan map[string][]common.TimeSeries) nodeResourceChann := make(chan map[string][]common.TimeSeries) podResourceChann := make(chan map[string][]common.TimeSeries) + State := make(map[string][]common.TimeSeries) return &StateCollector{ nodeName: nodeName, nepLister: nepLister, @@ -57,6 +60,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura collectors: &sync.Map{}, cadvisorManager: manager, exclusiveCPUSet: exclusiveCPUSet, + State: State, } } @@ -95,7 +99,7 @@ func (s *StateCollector) Run(stop <-chan struct{}) { start := time.Now() metrics.UpdateLastTime(string(known.ModuleStateCollector), metrics.StepMain, start) s.healthCheck.UpdateLastActivity(start) - s.Collect() + s.Collect(false) metrics.UpdateDurationFromStart(string(known.ModuleStateCollector), metrics.StepMain, start) case <-stop: klog.Infof("StateCollector exit") @@ -107,13 +111,10 @@ func (s *StateCollector) Run(stop <-chan struct{}) { return } -func (s *StateCollector) Collect() { +func (s *StateCollector) Collect(waterLine bool) { wg := sync.WaitGroup{} start := time.Now() - var data = make(map[string][]common.TimeSeries) - var mux sync.Mutex - s.collectors.Range(func(key, value interface{}) bool { c := value.(Collector) @@ -125,27 +126,27 @@ func (s *StateCollector) Collect() { defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleStateCollector), string(c.GetType()), metrics.StepCollect, start) if cdata, err := c.Collect(); err == nil { - mux.Lock() + s.rw.Lock() for key, series := range cdata { data[key] = series } - mux.Unlock() + s.rw.Unlock() } - }(c, data) + }(c, s.State) return true }) wg.Wait() - s.AnalyzerChann <- data + s.AnalyzerChann <- s.State if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { - s.NodeResourceChann <- data + s.NodeResourceChann <- s.State } if podResource := utilfeature.DefaultFeatureGate.Enabled(features.CranePodResource); podResource { - s.PodResourceChann <- data + s.PodResourceChann <- s.State } } @@ -245,3 +246,10 @@ func CheckMetricNameExist(name string) bool { return false } + +func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries { + return func() map[string][]common.TimeSeries { + s.Collect(true) + return s.State + } +} diff --git a/pkg/ensurance/collector/nodelocal/nodelocal.go b/pkg/ensurance/collector/nodelocal/nodelocal.go index d4857d752..c11092864 100644 --- a/pkg/ensurance/collector/nodelocal/nodelocal.go +++ b/pkg/ensurance/collector/nodelocal/nodelocal.go @@ -81,7 +81,7 @@ func (n *NodeLocal) Collect() (map[string][]common.TimeSeries, error) { } } - klog.V(10).Info("Node local collecting, status: %v", status) + klog.V(6).Info("Node local collecting, status: %#v", status) return status, nil } diff --git a/pkg/ensurance/collector/types/types.go b/pkg/ensurance/collector/types/types.go index 540a5507c..f56b72dd1 100644 --- a/pkg/ensurance/collector/types/types.go +++ b/pkg/ensurance/collector/types/types.go @@ -46,6 +46,7 @@ const ( MetricNetworkDropIn MetricName = "network_drop_in" MetricNetworkDropOut MetricName = "network_drop_out" + // Attention: this value is cpuUsageIncrease/timeIncrease, not cpuUsage MetricNameContainerCpuTotalUsage MetricName = "container_cpu_total_usage" MetricNameContainerCpuLimit MetricName = "container_cpu_limit" MetricNameContainerCpuQuota MetricName = "container_cpu_quota" diff --git a/pkg/ensurance/executor/cpu_usage.go b/pkg/ensurance/executor/cpu_usage.go new file mode 100644 index 000000000..a521907ff --- /dev/null +++ b/pkg/ensurance/executor/cpu_usage.go @@ -0,0 +1,256 @@ +package executor + +import ( + "fmt" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + "github.com/gocrane/crane/pkg/ensurance/executor/sort" + cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" + "github.com/gocrane/crane/pkg/metrics" + "github.com/gocrane/crane/pkg/utils" +) + +func init() { + registerMetricMap(cpu_usage) +} + +var cpu_usage = metric{ + Name: CpuUsage, + ActionPriority: 5, + SortAble: true, + SortFunc: sort.CpuUsageSorter, + + ThrottleAble: true, + ThrottleQuantified: true, + ThrottleFunc: throttleOnePodCpu, + RestoreFunc: restoreOnePodCpu, + + EvictAble: true, + EvictQuantified: true, + EvictFunc: evictOnePodCpu, +} + +func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { + pod, err := ctx.PodLister.Pods(ThrottleDownPods[index].PodKey.Namespace).Get(ThrottleDownPods[index].PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", ThrottleDownPods[index].PodKey.String())) + return + } + + // Throttle for CPU metrics + + klog.V(6).Infof("index %d, containerusage is %#v", index, ThrottleDownPods[index].ContainerCPUUsages) + + for _, v := range ThrottleDownPods[index].ContainerCPUUsages { + // pause container to skip + if v.ContainerName == "" { + continue + } + + klog.V(4).Infof("ThrottleExecutor begin to avoid container %s/%s", klog.KObj(pod), v.ContainerName) + + containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + continue + } + + containerCPUPeriod, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUPeriods, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + continue + } + + container, err := utils.GetPodContainerByName(pod, v.ContainerName) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + continue + } + + var containerCPUQuotaNew float64 + if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { + containerCPUQuotaNew = v.Value * (1.0 - float64(ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } else { + containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 - float64(ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } + + if requestCPU, ok := container.Resources.Requests[v1.ResourceCPU]; ok { + if float64(requestCPU.MilliValue())/CpuQuotaCoefficient > containerCPUQuotaNew { + containerCPUQuotaNew = float64(requestCPU.MilliValue()) / CpuQuotaCoefficient + } + } + + if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { + if float64(limitCPU.MilliValue())/CpuQuotaCoefficient*float64(ThrottleDownPods[index].CPUThrottle.MinCPURatio)/MaxRatio > containerCPUQuotaNew { + containerCPUQuotaNew = float64(limitCPU.MilliValue()) * float64(ThrottleDownPods[index].CPUThrottle.MinCPURatio) / CpuQuotaCoefficient + } + } + + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f", + containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value) + + if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", ThrottleDownPods[index].PodKey.String(), v.ContainerName, err)) + continue + } else { + klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", + klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) + + released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value) + klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].PodKey.String(), container.Name, released[CpuUsage]) + + totalReleasedResource.Add(released) + } + } + } + return +} + +func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { + pod, err := ctx.PodLister.Pods(ThrottleUpPods[index].PodKey.Namespace).Get(ThrottleUpPods[index].PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, "not found ", ThrottleUpPods[index].PodKey.String()) + return + } + + // Restore for CPU metric + for _, v := range ThrottleUpPods[index].ContainerCPUUsages { + // pause container to skip + if v.ContainerName == "" { + continue + } + + klog.V(6).Infof("ThrottleExecutor restore container %s/%s", klog.KObj(pod), v.ContainerName) + + containerCPUQuota, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUQuotas, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + continue + } + + containerCPUPeriod, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUPeriods, v.ContainerId) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + continue + } + + container, err := utils.GetPodContainerByName(pod, v.ContainerName) + if err != nil { + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + continue + } + + var containerCPUQuotaNew float64 + if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { + continue + } else { + containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 + float64(ThrottleUpPods[index].CPUThrottle.StepCPURatio)/MaxRatio) + } + + if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { + if float64(limitCPU.MilliValue())/CpuQuotaCoefficient < containerCPUQuotaNew { + containerCPUQuotaNew = float64(limitCPU.MilliValue()) / CpuQuotaCoefficient + } + } else { + usage, hasExtRes := utils.GetExtCpuRes(container) + if hasExtRes { + containerCPUQuotaNew = float64(usage.MilliValue()) / CpuQuotaCoefficient + } + if !hasExtRes && containerCPUQuotaNew > MaxUpQuota*containerCPUPeriod.Value/CpuQuotaCoefficient { + containerCPUQuotaNew = -1 + } + + } + + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f", + containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value) + + if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { + if utils.AlmostEqual(containerCPUQuotaNew, -1) { + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)}) + if err != nil { + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String()) + continue + } + } else { + err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) + if err != nil { + klog.Errorf("Failed to updateResource, err %s", err.Error()) + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String()) + continue + } + klog.V(4).Infof("ThrottleExecutor restore pod %s, container %s, set cpu quota %.2f, .", + klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) + released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value) + klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].PodKey, container.Name, released[CpuUsage]) + + totalReleasedResource.Add(released) + } + } + } + + return +} + +func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) { + wg.Add(1) + + // Calculate release resources + released = ConstructCpuUsageRelease(EvictPods[index], 0.0, 0.0) + totalReleasedResource.Add(released) + + go func(evictPod podinfo.PodContext) { + defer wg.Done() + + pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name) + if err != nil { + errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String()) + return + } + + err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds) + if err != nil { + errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String()) + klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err) + return + } + + metrics.ExecutorEvictCountsInc() + + klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod)) + }(EvictPods[index]) + return +} + +func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource { + if pod.PodType == podinfo.Evict { + return ReleaseResource{ + CpuUsage: pod.PodCPUUsage * CpuQuotaCoefficient, + } + } + if pod.PodType == podinfo.ThrottleDown { + reduction := (currentContainerCpuUsage - containerCPUQuotaNew) * CpuQuotaCoefficient + if reduction > 0 { + return ReleaseResource{ + CpuUsage: reduction, + } + } + return ReleaseResource{} + } + if pod.PodType == podinfo.ThrottleUp { + reduction := (containerCPUQuotaNew - currentContainerCpuUsage) * CpuQuotaCoefficient + if reduction > 0 { + return ReleaseResource{ + CpuUsage: reduction, + } + } + return ReleaseResource{} + } + return ReleaseResource{} +} diff --git a/pkg/ensurance/executor/evict.go b/pkg/ensurance/executor/evict.go index 6c636c6da..b73203d64 100644 --- a/pkg/ensurance/executor/evict.go +++ b/pkg/ensurance/executor/evict.go @@ -9,28 +9,19 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" - "github.com/gocrane/crane/pkg/utils" ) type EvictExecutor struct { EvictPods EvictPods + // All metrics(not only can be quantified metrics) metioned in triggerd NodeQOSEnsurancePolicy and their corresponding waterlines + EvictWaterLine WaterLines } -type EvictPod struct { - DeletionGracePeriodSeconds *int32 - PodKey types.NamespacedName - ClassAndPriority ClassAndPriority -} - -type EvictPods []EvictPod - -func (e EvictPods) Len() int { return len(e) } -func (e EvictPods) Swap(i, j int) { e[i], e[j] = e[j], e[i] } -func (e EvictPods) Less(i, j int) bool { - return e[i].ClassAndPriority.Less(e[j].ClassAndPriority) -} +type EvictPods []podinfo.PodContext func (e EvictPods) Find(key types.NamespacedName) int { for i, v := range e { @@ -57,41 +48,76 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { metrics.UpdateExecutorStatus(metrics.SubComponentEvict, metrics.StepAvoid, 1.0) metrics.ExecutorStatusCounterInc(metrics.SubComponentEvict, metrics.StepAvoid) - var bSucceed = true - var errPodKeys []string + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + totalReleased := ReleaseResource{} - wg := sync.WaitGroup{} + /* The step to evict: + 1. If EvictWaterLine has metrics that can't be quantified, select a evictable metric which has the highest action priority, use its EvictFunc to evict all selected pods, then return + 2. Get the gaps between current usage and waterlines + 2.1 If there is a metric that can't get current usage, select a evictable metric which has the highest action priority, use its EvictFunc to evict all selected pods, then return + 2.2 Traverse metrics that can be quantified, if there is gap for the metric, then sort candidate pods by its SortFunc if exists, otherwise use GeneralSorter by default. + Then evict sorted pods one by one util there is no gap to waterline + */ - for i := range e.EvictPods { - wg.Add(1) + metricsEvictQuantified, MetricsNotEvcitQuantified := e.EvictWaterLine.DivideMetricsByEvictQuantified() - go func(evictPod EvictPod) { - defer wg.Done() + // There is a metric that can't be EvictQuantified, so evict all selected pods + if len(MetricsNotEvcitQuantified) != 0 { + klog.V(6).Info("There is a metric that can't be EvcitQuantified") - pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String()) - return + highestPriorityMetric := e.EvictWaterLine.GetHighestPriorityEvictAbleMetric() + if highestPriorityMetric != "" { + klog.V(6).Infof("The highestPriorityMetric is %s", highestPriorityMetric) + errPodKeys = e.evictPods(ctx, &totalReleased, highestPriorityMetric) + } + } else { + _, _, ctx.EvictGapToWaterLines = buildGapToWaterLine(ctx.stateMap, ThrottleExecutor{}, *e, ctx.executeExcessPercent) + + if ctx.EvictGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Infof("There is a metric usage missed") + highestPriorityMetric := e.EvictWaterLine.GetHighestPriorityEvictAbleMetric() + if highestPriorityMetric != "" { + errPodKeys = e.evictPods(ctx, &totalReleased, highestPriorityMetric) } - - err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String()) - klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err) - return + } else { + // The metrics in EvictGapToWaterLines are can be EvictQuantified and has current usage, then evict precisely + var released ReleaseResource + wg := sync.WaitGroup{} + for _, m := range metricsEvictQuantified { + klog.V(6).Infof("Evict precisely on metric %s", m) + if metricMap[m].SortAble { + metricMap[m].SortFunc(e.EvictPods) + } else { + execsort.GeneralSorter(e.EvictPods) + } + + klog.V(6).Info("After sort, the sequence to evict is ") + for _, pc := range e.EvictPods { + klog.V(6).Info(pc.PodKey.String()) + } + + for !ctx.EvictGapToWaterLines.TargetGapsRemoved(m) { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.EvictGapToWaterLines[m]) + if podinfo.HasNoExecutedPod(e.EvictPods) { + index := podinfo.GetFirstNoExecutedPod(e.EvictPods) + errKeys, released = metricMap[m].EvictFunc(&wg, ctx, index, &totalReleased, e.EvictPods) + errPodKeys = append(errPodKeys, errKeys...) + klog.V(6).Infof("Evict pods %s, released %f resource", e.EvictPods[index].PodKey, released[m]) + + e.EvictPods[index].HasBeenActioned = true + ctx.EvictGapToWaterLines[m] -= released[m] + } else { + klog.V(6).Info("There is no pod that can be evicted") + break + } + } } - - metrics.ExecutorEvictCountsInc() - - klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod)) - }(e.EvictPods[i]) + wg.Wait() + } } - wg.Wait() - - if !bSucceed { + if len(errPodKeys) != 0 { return fmt.Errorf("some pod evict failed,err: %s", strings.Join(errPodKeys, ";")) } @@ -101,3 +127,13 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { func (e *EvictExecutor) Restore(ctx *ExecuteContext) error { return nil } + +func (e *EvictExecutor) evictPods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource, m WaterLineMetric) (errPodKeys []string) { + wg := sync.WaitGroup{} + for i := range e.EvictPods { + errKeys, _ := metricMap[m].EvictFunc(&wg, ctx, i, totalReleasedResource, e.EvictPods) + errPodKeys = append(errPodKeys, errKeys...) + } + wg.Wait() + return +} diff --git a/pkg/ensurance/executor/executor.go b/pkg/ensurance/executor/executor.go index 781a354f3..2280bc290 100644 --- a/pkg/ensurance/executor/executor.go +++ b/pkg/ensurance/executor/executor.go @@ -3,10 +3,7 @@ package executor import ( "time" - "github.com/gocrane/crane/pkg/known" - "github.com/gocrane/crane/pkg/metrics" "google.golang.org/grpc" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -14,8 +11,12 @@ import ( pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" + "github.com/gocrane/crane/pkg/common" cgrpc "github.com/gocrane/crane/pkg/ensurance/grpc" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" + "github.com/gocrane/crane/pkg/known" + "github.com/gocrane/crane/pkg/metrics" + "github.com/gocrane/crane/pkg/utils" ) type ActionExecutor struct { @@ -30,11 +31,15 @@ type ActionExecutor struct { runtimeClient pb.RuntimeServiceClient runtimeConn *grpc.ClientConn + + stateMap map[string][]common.TimeSeries + + executeExcessPercent float64 } // NewActionExecutor create enforcer manager func NewActionExecutor(client clientset.Interface, nodeName string, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, - noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string) *ActionExecutor { + noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string, stateMap map[string][]common.TimeSeries, executeExcess string) *ActionExecutor { runtimeClient, runtimeConn, err := cruntime.GetRuntimeClient(runtimeEndpoint) if err != nil { @@ -42,16 +47,24 @@ func NewActionExecutor(client clientset.Interface, nodeName string, podInformer return nil } + executeExcessPercent, err := utils.ParsePercentage(executeExcess) + if err != nil || executeExcessPercent > 100 { + klog.Errorf("Parse executeExcess failed %s", err.Error()) + return nil + } + return &ActionExecutor{ - nodeName: nodeName, - client: client, - noticeCh: noticeCh, - podLister: podInformer.Lister(), - podSynced: podInformer.Informer().HasSynced, - nodeLister: nodeInformer.Lister(), - nodeSynced: nodeInformer.Informer().HasSynced, - runtimeClient: runtimeClient, - runtimeConn: runtimeConn, + nodeName: nodeName, + client: client, + noticeCh: noticeCh, + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeSynced: nodeInformer.Informer().HasSynced, + runtimeClient: runtimeClient, + runtimeConn: runtimeConn, + stateMap: stateMap, + executeExcessPercent: executeExcessPercent, } } @@ -98,12 +111,14 @@ func (a *ActionExecutor) Run(stop <-chan struct{}) { func (a *ActionExecutor) execute(ae AvoidanceExecutor, _ <-chan struct{}) error { var ctx = &ExecuteContext{ - NodeName: a.nodeName, - Client: a.client, - PodLister: a.podLister, - NodeLister: a.nodeLister, - RuntimeClient: a.runtimeClient, - RuntimeConn: a.runtimeConn, + NodeName: a.nodeName, + Client: a.client, + PodLister: a.podLister, + NodeLister: a.nodeLister, + RuntimeClient: a.runtimeClient, + RuntimeConn: a.runtimeConn, + stateMap: ae.StateMap, + executeExcessPercent: a.executeExcessPercent, } //step1 do enforcer actions diff --git a/pkg/ensurance/executor/interface.go b/pkg/ensurance/executor/interface.go index 5b39921a2..b51016271 100644 --- a/pkg/ensurance/executor/interface.go +++ b/pkg/ensurance/executor/interface.go @@ -5,6 +5,8 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + + "github.com/gocrane/crane/pkg/common" ) type Executor interface { @@ -16,6 +18,7 @@ type AvoidanceExecutor struct { ScheduleExecutor ScheduleExecutor ThrottleExecutor ThrottleExecutor EvictExecutor EvictExecutor + StateMap map[string][]common.TimeSeries } type ExecuteContext struct { @@ -25,4 +28,16 @@ type ExecuteContext struct { NodeLister corelisters.NodeLister RuntimeClient pb.RuntimeServiceClient RuntimeConn *grpc.ClientConn + + // Gap for metrics EvictAble/ThrottleAble + // Key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleDown action) + ThrottoleDownGapToWaterLines GapToWaterLines + // Key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleUp action) + ThrottoleUpGapToWaterLines GapToWaterLines + // key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use evict action) + EvictGapToWaterLines GapToWaterLines + + stateMap map[string][]common.TimeSeries + + executeExcessPercent float64 } diff --git a/pkg/ensurance/executor/metric.go b/pkg/ensurance/executor/metric.go new file mode 100644 index 000000000..50f5e1b66 --- /dev/null +++ b/pkg/ensurance/executor/metric.go @@ -0,0 +1,55 @@ +package executor + +import ( + "sync" + + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" +) + +type metric struct { + // Should be consistent with metrics in collector/types/types.go + Name WaterLineMetric + + // ActionPriority describe the priority of the metric, used to choose the highest priority metric which can be throttlable or evictable + // when there is MetricsNotThrottleQuantified in executor process; + // The range is 0 to 10, 10 is the highest, 0 is the lowest; + // Some incompressible metric such as memory usage can be given a higher priority + ActionPriority int + + SortAble bool + SortFunc func(pods []podinfo.PodContext) + + ThrottleAble bool + ThrottleQuantified bool + ThrottleFunc func(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) + RestoreFunc func(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) + + EvictAble bool + EvictQuantified bool + // If use goroutine to evcit, make sure to calculate release resources outside the goroutine + EvictFunc func(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) +} + +var metricMap = make(map[WaterLineMetric]metric) + +func registerMetricMap(m metric) { + metricMap[m.Name] = m +} + +func GetThrottleAbleMetricName() (throttleAbleMetricList []WaterLineMetric) { + for _, m := range metricMap { + if m.ThrottleAble { + throttleAbleMetricList = append(throttleAbleMetricList, m.Name) + } + } + return +} + +func GetEvictAbleMetricName() (evictAbleMetricList []WaterLineMetric) { + for _, m := range metricMap { + if m.EvictAble { + evictAbleMetricList = append(evictAbleMetricList, m.Name) + } + } + return +} diff --git a/pkg/ensurance/executor/pod-info/pod_info.go b/pkg/ensurance/executor/pod-info/pod_info.go new file mode 100644 index 000000000..067181e86 --- /dev/null +++ b/pkg/ensurance/executor/pod-info/pod_info.go @@ -0,0 +1,244 @@ +package pod_info + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + + ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1" + "github.com/gocrane/crane/pkg/common" + stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" + "github.com/gocrane/crane/pkg/utils" +) + +type ClassAndPriority struct { + PodQOSClass v1.PodQOSClass + PriorityClassValue int32 +} + +type PodType string + +const ( + ThrottleDown PodType = "ThrottleDown" + ThrottleUp PodType = "ThrottleUp" + Evict PodType = "Evict" +) + +type ContainerUsage struct { + ContainerName string + ContainerId string + Value float64 +} + +func GetUsageById(usages []ContainerUsage, containerId string) (ContainerUsage, error) { + for _, v := range usages { + if v.ContainerId == containerId { + return v, nil + } + } + + return ContainerUsage{}, fmt.Errorf("containerUsage not found") +} + +func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerUsage) { + var podUsage = 0.0 + var containerUsages []ContainerUsage + var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} + state, ok := stateMap[metricName] + if !ok { + return podUsage, containerUsages + } + for _, vv := range state { + var labelMaps = common.Labels2Maps(vv.Labels) + if utils.ContainMaps(labelMaps, podMaps) { + if labelMaps[common.LabelNameContainerId] == "" { + podUsage = vv.Samples[0].Value + } else { + containerUsages = append(containerUsages, ContainerUsage{ContainerId: labelMaps[common.LabelNameContainerId], + ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) + } + } + } + + return podUsage, containerUsages +} + +type CPURatio struct { + //the min of cpu ratio for pods + MinCPURatio uint64 `json:"minCPURatio,omitempty"` + + //the step of cpu share and limit for once down-size (1-100) + StepCPURatio uint64 `json:"stepCPURatio,omitempty"` +} + +type MemoryThrottleExecutor struct { + // to force gc the page cache of low level pods + ForceGC bool `json:"forceGC,omitempty"` +} + +type PodContext struct { + PodKey types.NamespacedName + ClassAndPriority ClassAndPriority + PodCPUUsage float64 + ContainerCPUUsages []ContainerUsage + PodCPUShare float64 + ContainerCPUShares []ContainerUsage + PodCPUQuota float64 + ContainerCPUQuotas []ContainerUsage + PodCPUPeriod float64 + ContainerCPUPeriods []ContainerUsage + ExtCpuBeUsed bool + ExtCpuLimit int64 + ExtCpuRequest int64 + StartTime *metav1.Time + + PodType PodType + + CPUThrottle CPURatio + MemoryThrottle MemoryThrottleExecutor + + DeletionGracePeriodSeconds *int32 + + HasBeenActioned bool +} + +func HasNoExecutedPod(pods []PodContext) bool { + for _, p := range pods { + if p.HasBeenActioned == false { + return true + } + } + return false +} + +func GetFirstNoExecutedPod(pods []PodContext) int { + for index, p := range pods { + if p.HasBeenActioned == false { + return index + } + } + return -1 +} + +func BuildPodBasicInfo(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction, podType PodType) PodContext { + var podContext PodContext + + podContext.ClassAndPriority = ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)} + podContext.PodKey = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + + podContext.PodCPUUsage, podContext.ContainerCPUUsages = GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod) + podContext.PodCPUShare, podContext.ContainerCPUShares = GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod) + podContext.PodCPUQuota, podContext.ContainerCPUQuotas = GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod) + podContext.PodCPUPeriod, podContext.ContainerCPUPeriods = GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod) + podContext.ExtCpuBeUsed, podContext.ExtCpuLimit, podContext.ExtCpuRequest = utils.ExtResourceAllocated(pod, v1.ResourceCPU) + podContext.StartTime = pod.Status.StartTime + + if action.Spec.Throttle != nil { + podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) + podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) + } + + podContext.PodType = podType + + return podContext +} + +func CompareClassAndPriority(a, b ClassAndPriority) int32 { + qosClassCmp := comparePodQosClass(a.PodQOSClass, b.PodQOSClass) + if qosClassCmp != 0 { + return qosClassCmp + } + if a.PriorityClassValue == b.PriorityClassValue { + return 0 + } else if a.PriorityClassValue < b.PriorityClassValue { + return -1 + } + return 1 +} + +func (s ClassAndPriority) Less(i ClassAndPriority) bool { + if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == 1 { + return false + } + + if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == -1 { + return true + } + + return s.PriorityClassValue < i.PriorityClassValue +} + +func (s ClassAndPriority) Greater(i ClassAndPriority) bool { + if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == 1 { + return true + } + + if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == -1 { + return false + } + + return s.PriorityClassValue > i.PriorityClassValue +} + +func GetMaxQOSPriority(podLister corelisters.PodLister, podTypes []types.NamespacedName) (types.NamespacedName, ClassAndPriority) { + + var podType types.NamespacedName + var scheduledQOSPriority ClassAndPriority + + for _, podNamespace := range podTypes { + if pod, err := podLister.Pods(podNamespace.Namespace).Get(podNamespace.Name); err != nil { + klog.V(6).Infof("Warning: getMaxQOSPriority get pod %s not found", podNamespace.String()) + continue + } else { + var priority = ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0) - 1} + if priority.Greater(scheduledQOSPriority) { + scheduledQOSPriority = priority + podType = podNamespace + } + } + } + + return podType, scheduledQOSPriority +} + +// We defined guaranteed is the highest qos class, burstable is the middle level +// bestEffort is the lowest +// if a qos class is greater than b, return 1 +// if a qos class is less than b, return -1 +// if a qos class equal with b , return 0 +func comparePodQosClass(a v1.PodQOSClass, b v1.PodQOSClass) int32 { + switch b { + case v1.PodQOSGuaranteed: + if a == v1.PodQOSGuaranteed { + return 0 + } else { + return -1 + } + case v1.PodQOSBurstable: + if a == v1.PodQOSGuaranteed { + return 1 + } else if a == v1.PodQOSBurstable { + return 0 + } else { + return -1 + } + case v1.PodQOSBestEffort: + if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) { + return 1 + } else if a == v1.PodQOSBestEffort { + return 0 + } else { + return -1 + } + default: + if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) || (a == v1.PodQOSBestEffort) { + return 1 + } else { + return 0 + } + } +} diff --git a/pkg/ensurance/executor/release_resource.go b/pkg/ensurance/executor/release_resource.go new file mode 100644 index 000000000..fe245b5ce --- /dev/null +++ b/pkg/ensurance/executor/release_resource.go @@ -0,0 +1,12 @@ +package executor + +type ReleaseResource map[WaterLineMetric]float64 + +func (r ReleaseResource) Add(new ReleaseResource) { + for metric, value := range new { + if _, ok := r[metric]; !ok { + r[metric] = 0.0 + } + r[metric] += value + } +} diff --git a/pkg/ensurance/executor/schedule.go b/pkg/ensurance/executor/schedule.go index 9d5781b5d..663ad227e 100644 --- a/pkg/ensurance/executor/schedule.go +++ b/pkg/ensurance/executor/schedule.go @@ -3,13 +3,12 @@ package executor import ( "time" - "github.com/gocrane/crane/pkg/metrics" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" "github.com/gocrane/crane/pkg/known" + "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" ) @@ -17,18 +16,9 @@ const ( DefaultCoolDownSeconds = 300 ) -type ComparablePod struct { - *v1.Pod -} - type ScheduleExecutor struct { - DisableClassAndPriority *ClassAndPriority - RestoreClassAndPriority *ClassAndPriority -} - -type ClassAndPriority struct { - PodQOSClass v1.PodQOSClass - PriorityClassValue int32 + DisableClassAndPriority *podinfo.ClassAndPriority + RestoreClassAndPriority *podinfo.ClassAndPriority } func (b *ScheduleExecutor) Avoid(ctx *ExecuteContext) error { @@ -90,94 +80,3 @@ func (b *ScheduleExecutor) Restore(ctx *ExecuteContext) error { return nil } - -func (p *ComparablePod) Less(p2 ComparablePod) bool { - if comparePodQos(p.Status.QOSClass, p2.Status.QOSClass) == 1 { - return false - } - - return *p.Spec.Priority < *p2.Spec.Priority -} - -func (s ClassAndPriority) Less(i ClassAndPriority) bool { - if comparePodQos(s.PodQOSClass, i.PodQOSClass) == 1 { - return false - } - - if comparePodQos(s.PodQOSClass, i.PodQOSClass) == -1 { - return true - } - - return s.PriorityClassValue < i.PriorityClassValue -} - -func (s ClassAndPriority) Greater(i ClassAndPriority) bool { - if comparePodQos(s.PodQOSClass, i.PodQOSClass) == 1 { - return true - } - - if comparePodQos(s.PodQOSClass, i.PodQOSClass) == -1 { - return false - } - - return s.PriorityClassValue > i.PriorityClassValue -} - -func GetMaxQOSPriority(podLister corelisters.PodLister, podTypes []types.NamespacedName) (types.NamespacedName, ClassAndPriority) { - - var podType types.NamespacedName - var scheduledQOSPriority ClassAndPriority - - for _, podNamespace := range podTypes { - if pod, err := podLister.Pods(podNamespace.Namespace).Get(podNamespace.Name); err != nil { - klog.V(6).Infof("Warning: getMaxQOSPriority get pod %s not found", podNamespace.String()) - continue - } else { - var priority = ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0) - 1} - if priority.Greater(scheduledQOSPriority) { - scheduledQOSPriority = priority - podType = podNamespace - } - } - } - - return podType, scheduledQOSPriority -} - -// We defined guaranteed is the highest qos class, burstable is the middle level -// bestEffort is the lowest -// if a qos class is greater than b, return 1 -// if a qos class is less than b, return -1 -// if a qos class equal with b , return 0 -func comparePodQos(a v1.PodQOSClass, b v1.PodQOSClass) int32 { - switch b { - case v1.PodQOSGuaranteed: - if a == v1.PodQOSGuaranteed { - return 0 - } else { - return -1 - } - case v1.PodQOSBurstable: - if a == v1.PodQOSGuaranteed { - return 1 - } else if a == v1.PodQOSBurstable { - return 0 - } else { - return -1 - } - case v1.PodQOSBestEffort: - if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) { - return 1 - } else if a == v1.PodQOSBestEffort { - return 0 - } else { - return -1 - } - default: - if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) || (a == v1.PodQOSBestEffort) { - return 1 - } else { - return 0 - } - } -} diff --git a/pkg/ensurance/executor/sort/cpu_usage_sort.go b/pkg/ensurance/executor/sort/cpu_usage_sort.go new file mode 100644 index 000000000..08acb1f99 --- /dev/null +++ b/pkg/ensurance/executor/sort/cpu_usage_sort.go @@ -0,0 +1,47 @@ +package sort + +import ( + v1 "k8s.io/api/core/v1" + + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + "github.com/gocrane/crane/pkg/utils" +) + +func CpuUsageSorter(pods []podinfo.PodContext) { + orderedBy(classAndPriority, cpuUsage, extCpuUsage, runningTime).Sort(pods) +} + +// extCpuUsage compares the partition of extcpu usage to extcpu limit +func extCpuUsage(p1, p2 podinfo.PodContext) int32 { + // if both pod don't use ext resource, then return + if p1.ExtCpuBeUsed == false && p2.ExtCpuBeUsed == false { + return 0 + } + + p1Ratio := p1.PodCPUUsage / float64(p1.ExtCpuLimit) + p2Ratio := p2.PodCPUUsage / float64(p2.ExtCpuLimit) + + return utils.CmpFloat(p1Ratio, p2Ratio) +} + +// cpuUsage compares the partition extcpu usage of extcpu limit +func cpuUsage(p1, p2 podinfo.PodContext) int32 { + var p1usage, p2usage float64 + // if both pod is PodQOSBestEffort, then compare the absolute usage;otherwise, cmpare the ratio compared with PodCPUQuota + if p1.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort && p2.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort { + p1usage = p1.PodCPUUsage + p2usage = p2.PodCPUUsage + } else { + p1usage = p1.PodCPUUsage * p1.PodCPUPeriod / p1.PodCPUQuota + p2usage = p2.PodCPUUsage * p2.PodCPUPeriod / p2.PodCPUQuota + } + return utils.CmpFloat(p1usage, p2usage) +} + +// extCpuBeUsed compares pod by using ext resource whether +func extCpuBeUsed(p1, p2 podinfo.PodContext) int32 { + use1 := utils.Bool2Uint(p1.ExtCpuBeUsed) + use2 := utils.Bool2Uint(p2.ExtCpuBeUsed) + + return int32(use1 - use2) +} diff --git a/pkg/ensurance/executor/sort/general_sort.go b/pkg/ensurance/executor/sort/general_sort.go new file mode 100644 index 000000000..1a509ce92 --- /dev/null +++ b/pkg/ensurance/executor/sort/general_sort.go @@ -0,0 +1,7 @@ +package sort + +import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + +func GeneralSorter(pods []podinfo.PodContext) { + orderedBy(classAndPriority, runningTime).Sort(pods) +} diff --git a/pkg/ensurance/executor/sort/mem_metrics_sort.go b/pkg/ensurance/executor/sort/mem_metrics_sort.go new file mode 100644 index 000000000..9fa01e742 --- /dev/null +++ b/pkg/ensurance/executor/sort/mem_metrics_sort.go @@ -0,0 +1,9 @@ +package sort + +import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + +// Todo: Memory metrics related sort func need to be filled + +func MemMetricsSorter(pods []podinfo.PodContext) { + orderedBy(classAndPriority, runningTime).Sort(pods) +} diff --git a/pkg/ensurance/executor/sort/sort.go b/pkg/ensurance/executor/sort/sort.go new file mode 100644 index 000000000..3c5d1a0a8 --- /dev/null +++ b/pkg/ensurance/executor/sort/sort.go @@ -0,0 +1,120 @@ +package sort + +import ( + "sort" + + "k8s.io/klog/v2" + + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" +) + +// RankFunc sorts the pods +type RankFunc func(pods []podinfo.PodContext) + +var sortFunc = map[string]func(p1, p2 podinfo.PodContext) int32{ + "ExtCpuBeUsed": extCpuBeUsed, + "ClassAndPriority": classAndPriority, + "ExtCpuUsage": extCpuUsage, + "CpuUsage": cpuUsage, + "RunningTime": runningTime, +} + +func RankFuncConstruct(customize []string) RankFunc { + if len(customize) == 0 { + klog.Fatal("If customize sort func is defined, it can't be empty.") + } + var rankFunc RankFunc + if len(customize) != 0 { + cmp := []cmpFunc{} + for _, f := range customize { + if f, ok := sortFunc[f]; ok { + cmp = append(cmp, f) + } + rankFunc = orderedBy(cmp...).Sort + } + } else { + rankFunc = CpuUsageSorter + } + + return rankFunc +} + +// runningTime compares pods by pod's start time +func runningTime(p1, p2 podinfo.PodContext) int32 { + t1 := p1.StartTime + t2 := p2.StartTime + + if t1.Before(t2) { + return 1 + } else if t1.Equal(t2) { + return 0 + } + + if t2 == nil { + return 1 + } + return -1 +} + +// classAndPriority compares pods by pod's ClassAndPriority +func classAndPriority(p1, p2 podinfo.PodContext) int32 { + return podinfo.CompareClassAndPriority(p1.ClassAndPriority, p2.ClassAndPriority) +} + +// Cmp compares p1 and p2 and returns: +// +// -1 if p1 < p2 +// 0 if p1 == p2 +// +1 if p1 > p2 +// +type cmpFunc func(p1, p2 podinfo.PodContext) int32 + +// podSorter implements the Sort interface, sorting changes within. +type podSorter struct { + pods []podinfo.PodContext + cmp []cmpFunc +} + +// Sort sorts the argument slice according to the less functions passed to orderedBy. +func (ms *podSorter) Sort(pods []podinfo.PodContext) { + ms.pods = pods + sort.Sort(ms) +} + +// orderedBy returns a Sorter that sorts using the cmp functions, in order. +// Call its Sort method to sort the data. +func orderedBy(cmp ...cmpFunc) *podSorter { + return &podSorter{ + cmp: cmp, + } +} + +// Len is part of sort.Interface. +func (ms *podSorter) Len() int { + return len(ms.pods) +} + +// Swap is part of sort.Interface. +func (ms *podSorter) Swap(i, j int) { + ms.pods[i], ms.pods[j] = ms.pods[j], ms.pods[i] +} + +// Less is part of sort.Interface. +func (ms *podSorter) Less(i, j int) bool { + p1, p2 := ms.pods[i], ms.pods[j] + var k int + for k = 0; k < len(ms.cmp)-1; k++ { + cmpResult := ms.cmp[k](p1, p2) + // p1 is less than p2 + if cmpResult < 0 { + return true + } + // p1 is greater than p2 + if cmpResult > 0 { + return false + } + // we don't know yet + } + // the last cmp func is the final decider + return ms.cmp[k](p1, p2) < 0 +} diff --git a/pkg/ensurance/executor/throttle.go b/pkg/ensurance/executor/throttle.go index 2f86e62d3..fd55248fd 100644 --- a/pkg/ensurance/executor/throttle.go +++ b/pkg/ensurance/executor/throttle.go @@ -5,15 +5,13 @@ import ( "strings" "time" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - "github.com/gocrane/crane/pkg/common" - cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" - "github.com/gocrane/crane/pkg/utils" ) const ( @@ -25,19 +23,16 @@ const ( type ThrottleExecutor struct { ThrottleDownPods ThrottlePods ThrottleUpPods ThrottlePods + // All metrics(not only metrics that can be quantified) metioned in triggerd NodeQOSEnsurancePolicy and their corresponding waterlines + ThrottleDownWaterLine WaterLines + ThrottleUpWaterLine WaterLines } -type ThrottlePods []ThrottlePod - -func (t ThrottlePods) Len() int { return len(t) } -func (t ThrottlePods) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t ThrottlePods) Less(i, j int) bool { - return t[i].PodQOSPriority.Less(t[j].PodQOSPriority) -} +type ThrottlePods []podinfo.PodContext func (t ThrottlePods) Find(podTypes types.NamespacedName) int { for i, v := range t { - if v.PodTypes == podTypes { + if v.PodKey == podTypes { return i } } @@ -45,48 +40,13 @@ func (t ThrottlePods) Find(podTypes types.NamespacedName) int { return -1 } -type CPURatio struct { - //the min of cpu ratio for pods - MinCPURatio uint64 `json:"minCPURatio,omitempty"` - - //the step of cpu share and limit for once down-size (1-100) - StepCPURatio uint64 `json:"stepCPURatio,omitempty"` -} - -type MemoryThrottleExecutor struct { - // to force gc the page cache of low level pods - ForceGC bool `json:"forceGC,omitempty"` -} - -type ThrottlePod struct { - CPUThrottle CPURatio - MemoryThrottle MemoryThrottleExecutor - PodTypes types.NamespacedName - PodCPUUsage float64 - ContainerCPUUsages []ContainerUsage - PodCPUShare float64 - ContainerCPUShares []ContainerUsage - PodCPUQuota float64 - ContainerCPUQuotas []ContainerUsage - PodCPUPeriod float64 - ContainerCPUPeriods []ContainerUsage - PodQOSPriority ClassAndPriority -} - -type ContainerUsage struct { - ContainerName string - ContainerId string - Value float64 -} - -func GetUsageById(usages []ContainerUsage, containerId string) (ContainerUsage, error) { - for _, v := range usages { - if v.ContainerId == containerId { - return v, nil - } +func Reverse(t ThrottlePods) []podinfo.PodContext { + throttle := []podinfo.PodContext(t) + l := len(throttle) + for i := 0; i < l/2; i++ { + throttle[i], throttle[l-i-1] = throttle[l-i-1], throttle[i] } - - return ContainerUsage{}, fmt.Errorf("containerUsage not found") + return throttle } func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { @@ -94,7 +54,7 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { metrics.UpdateLastTimeWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepAvoid, start) defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepAvoid, start) - klog.V(6).Info("ThrottleExecutor avoid, %v", *t) + klog.V(6).Infof("ThrottleExecutor avoid, %#v", *t) if len(t.ThrottleDownPods) == 0 { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepAvoid, 0) @@ -103,90 +63,81 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { metrics.ExecutorStatusCounterInc(metrics.SubComponentThrottle, metrics.StepAvoid) } - var bSucceed = true - var errPodKeys []string - - for _, throttlePod := range t.ThrottleDownPods { - - pod, err := ctx.PodLister.Pods(throttlePod.PodTypes.Namespace).Get(throttlePod.PodTypes.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", throttlePod.PodTypes.String())) - continue + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + totalReleased := ReleaseResource{} + + /* The step to throttle: + 1. If ThrottleDownWaterLine has metrics that can't be quantified, select a throttleable metric which has the highest action priority, use its throttlefunc to throttle all ThrottleDownPods, then return + 2. Get the gaps between current usage and waterlines + 2.1 If there is a metric that can't get current usage, select a throttleable metric which has the highest action priority, use its throttlefunc to throttle all ThrottleDownPods, then return + 2.2 Traverse metrics that can be quantified, if there is a gap for the metric, then sort candidate pods by its SortFunc if exists, otherwise use GeneralSorter by default. + Then throttle sorted pods one by one util there is no gap to waterline + */ + metricsThrottleQuantified, MetricsNotThrottleQuantified := t.ThrottleDownWaterLine.DivideMetricsByThrottleQuantified() + + // There is a metric that can't be ThrottleQuantified, so throttle all selected pods + if len(MetricsNotThrottleQuantified) != 0 { + klog.V(6).Info("ThrottleDown: There is a metric that can't be ThrottleQuantified") + + highestPriorityMetric := t.ThrottleDownWaterLine.GetHighestPriorityThrottleAbleMetric() + if highestPriorityMetric != "" { + klog.V(6).Infof("The highestPriorityMetric is %s", highestPriorityMetric) + errPodKeys = t.throttlePods(ctx, &totalReleased, highestPriorityMetric) } - - for _, v := range throttlePod.ContainerCPUUsages { - // pause container to skip - if v.ContainerName == "" { - continue - } - - klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName) - - containerCPUQuota, err := GetUsageById(throttlePod.ContainerCPUQuotas, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - containerCPUPeriod, err := GetUsageById(throttlePod.ContainerCPUPeriods, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - container, err := utils.GetPodContainerByName(pod, v.ContainerName) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - var containerCPUQuotaNew float64 - if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { - containerCPUQuotaNew = v.Value * (1.0 - float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) - } else { - containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 - float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) - } - - if requestCPU, ok := container.Resources.Requests[v1.ResourceCPU]; ok { - if float64(requestCPU.MilliValue())/CpuQuotaCoefficient > containerCPUQuotaNew { - containerCPUQuotaNew = float64(requestCPU.MilliValue()) / CpuQuotaCoefficient + } else { + ctx.ThrottoleDownGapToWaterLines, _, _ = buildGapToWaterLine(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent) + + if ctx.ThrottoleDownGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Info("There is a metric usage missed") + highestPriorityMetric := t.ThrottleDownWaterLine.GetHighestPriorityThrottleAbleMetric() + if highestPriorityMetric != "" { + errPodKeys = t.throttlePods(ctx, &totalReleased, highestPriorityMetric) + } + } else { + // The metrics in ThrottoleDownGapToWaterLines are all in WaterLineMetricsCanBeQuantified and has current usage, then throttle precisely + var released ReleaseResource + for _, m := range metricsThrottleQuantified { + klog.V(6).Infof("ThrottleDown precisely on metric %s", m) + if metricMap[m].SortAble { + metricMap[m].SortFunc(t.ThrottleDownPods) + } else { + execsort.GeneralSorter(t.ThrottleDownPods) } - } - if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { - if float64(limitCPU.MilliValue())/CpuQuotaCoefficient*float64(throttlePod.CPUThrottle.MinCPURatio)/MaxRatio > containerCPUQuotaNew { - containerCPUQuotaNew = float64(limitCPU.MilliValue()) * float64(throttlePod.CPUThrottle.MinCPURatio) / CpuQuotaCoefficient + klog.V(6).Info("After sort, the sequence to throttle is ") + for _, pc := range t.ThrottleDownPods { + klog.V(6).Info(pc.PodKey.String(), pc.ContainerCPUUsages) } - } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f", - containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) + for index := 0; !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleDownPods); index++ { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleDownGapToWaterLines[m]) - if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) - if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", throttlePod.PodTypes.String(), v.ContainerName, err)) - bSucceed = false - continue - } else { - klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", - klog.KObj(pod), v.ContainerName, containerCPUQuotaNew) + errKeys, released = metricMap[m].ThrottleFunc(ctx, index, t.ThrottleDownPods, &totalReleased) + klog.V(6).Infof("ThrottleDown pods %s, released %f resource", t.ThrottleDownPods[index].PodKey, released[m]) + errPodKeys = append(errPodKeys, errKeys...) + + ctx.ThrottoleDownGapToWaterLines[m] -= released[m] } } } } - if !bSucceed { + if len(errPodKeys) != 0 { return fmt.Errorf("some pod throttle failed,err: %s", strings.Join(errPodKeys, ";")) } return nil } +func (t *ThrottleExecutor) throttlePods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource, m WaterLineMetric) (errPodKeys []string) { + for i := range t.ThrottleDownPods { + errKeys, _ := metricMap[m].ThrottleFunc(ctx, i, t.ThrottleDownPods, totalReleasedResource) + errPodKeys = append(errPodKeys, errKeys...) + } + return +} + func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { var start = time.Now() metrics.UpdateLastTimeWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepRestore, start) @@ -194,7 +145,7 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { klog.V(6).Info("ThrottleExecutor restore, %v", *t) - if len(t.ThrottleDownPods) == 0 { + if len(t.ThrottleUpPods) == 0 { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepRestore, 0) return nil } @@ -202,121 +153,78 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepRestore, 1.0) metrics.ExecutorStatusCounterInc(metrics.SubComponentThrottle, metrics.StepRestore) - var bSucceed = true - var errPodKeys []string - - for _, throttlePod := range t.ThrottleUpPods { - - pod, err := ctx.PodLister.Pods(throttlePod.PodTypes.Namespace).Get(throttlePod.PodTypes.Name) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, "not found ", throttlePod.PodTypes.String()) - continue + var errPodKeys, errKeys []string + // TODO: totalReleasedResource used for prom metrics + totalReleased := ReleaseResource{} + + /* The step to restore: + 1. If ThrottleUpWaterLine has metrics that can't be quantified, select a throttleable metric which has the highest action priority, use its RestoreFunc to restore all ThrottleUpPods, then return + 2. Get the gaps between current usage and waterlines + 2.1 If there is a metric that can't get current usage, select a throttleable metric which has the highest action priority, use its RestoreFunc to restore all ThrottleUpPods, then return + 2.2 Traverse metrics that can be quantified, if there is a gap for the metric, then sort candidate pods by its SortFunc if exists, otherwise use GeneralSorter by default. + Then restore sorted pods one by one util there is no gap to waterline + */ + metricsThrottleQuantified, MetricsNotThrottleQuantified := t.ThrottleUpWaterLine.DivideMetricsByThrottleQuantified() + + // There is a metric that can't be ThrottleQuantified, so restore all selected pods + if len(MetricsNotThrottleQuantified) != 0 { + klog.V(6).Info("ThrottleUp: There is a metric that can't be ThrottleQuantified") + + highestPrioriyMetric := t.ThrottleUpWaterLine.GetHighestPriorityThrottleAbleMetric() + if highestPrioriyMetric != "" { + klog.V(6).Infof("The highestPrioriyMetric is %s", highestPrioriyMetric) + errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric) } - - for _, v := range throttlePod.ContainerCPUUsages { - - // pause container to skip - if v.ContainerName == "" { - continue - } - - klog.V(6).Infof("ThrottleExecutor1 restore container %s/%s", klog.KObj(pod), v.ContainerName) - - containerCPUQuota, err := GetUsageById(throttlePod.ContainerCPUQuotas, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - containerCPUPeriod, err := GetUsageById(throttlePod.ContainerCPUPeriods, v.ContainerId) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - container, err := utils.GetPodContainerByName(pod, v.ContainerName) - if err != nil { - bSucceed = false - errPodKeys = append(errPodKeys, err.Error(), throttlePod.PodTypes.String()) - continue - } - - var containerCPUQuotaNew float64 - if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) { - continue - } else { - containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 + float64(throttlePod.CPUThrottle.StepCPURatio)/MaxRatio) - } - - if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok { - if float64(limitCPU.MilliValue())/CpuQuotaCoefficient < containerCPUQuotaNew { - containerCPUQuotaNew = float64(limitCPU.MilliValue()) / CpuQuotaCoefficient - } - } else { - usage, hasExtRes := utils.GetExtCpuRes(container) - if hasExtRes { - containerCPUQuotaNew = float64(usage.MilliValue()) / CpuQuotaCoefficient - } - if !hasExtRes && containerCPUQuotaNew > MaxUpQuota*containerCPUPeriod.Value/CpuQuotaCoefficient { - containerCPUQuotaNew = -1 + } else { + _, ctx.ThrottoleUpGapToWaterLines, _ = buildGapToWaterLine(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent) + + if ctx.ThrottoleUpGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Info("There is a metric usage missed") + highestPrioriyMetric := t.ThrottleUpWaterLine.GetHighestPriorityThrottleAbleMetric() + if highestPrioriyMetric != "" { + errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric) + } + } else { + // The metrics in ThrottoleUpGapToWaterLines are all in WaterLineMetricsCanBeQuantified and has current usage, then throttle precisely + var released ReleaseResource + for _, m := range metricsThrottleQuantified { + klog.V(6).Infof("ThrottleUp precisely on metric %s", m) + if metricMap[m].SortAble { + metricMap[m].SortFunc(t.ThrottleUpPods) + } else { + execsort.GeneralSorter(t.ThrottleUpPods) } + //t.ThrottleUpPods = Reverse(t.ThrottleUpPods) - } + klog.V(6).Info("After sort, the sequence to throttle is ") + for _, pc := range t.ThrottleUpPods { + klog.V(6).Info(pc.PodKey.String()) + } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f", - klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) + for index := 0; !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleUpGapToWaterLines[m]) - if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { + errKeys, released = metricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased) + klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].PodKey, released[m]) + errPodKeys = append(errPodKeys, errKeys...) - if utils.AlmostEqual(containerCPUQuotaNew, -1) { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)}) - if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), throttlePod.PodTypes.String()) - bSucceed = false - continue - } - } else { - err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) - if err != nil { - klog.Errorf("Failed to updateResource, err %s", err.Error()) - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), throttlePod.PodTypes.String()) - bSucceed = false - continue - } + ctx.ThrottoleUpGapToWaterLines[m] -= released[m] } } } } - if !bSucceed { + if len(errPodKeys) != 0 { return fmt.Errorf("some pod throttle restore failed,err: %s", strings.Join(errPodKeys, ";")) } return nil } -func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerUsage) { - var podUsage = 0.0 - var containerUsages []ContainerUsage - var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} - state, ok := stateMap[metricName] - if !ok { - return podUsage, containerUsages +func (t *ThrottleExecutor) restorePods(ctx *ExecuteContext, totalReleasedResource *ReleaseResource, m WaterLineMetric) (errPodKeys []string) { + for i := range t.ThrottleUpPods { + errKeys, _ := metricMap[m].RestoreFunc(ctx, i, t.ThrottleDownPods, totalReleasedResource) + errPodKeys = append(errPodKeys, errKeys...) } - for _, vv := range state { - var labelMaps = common.Labels2Maps(vv.Labels) - if utils.ContainMaps(labelMaps, podMaps) { - if labelMaps[common.LabelNameContainerId] == "" { - podUsage = vv.Samples[0].Value - } else { - containerUsages = append(containerUsages, ContainerUsage{ContainerId: labelMaps[common.LabelNameContainerId], - ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) - } - } - } - - return podUsage, containerUsages + return } diff --git a/pkg/ensurance/executor/waterline.go b/pkg/ensurance/executor/waterline.go new file mode 100644 index 000000000..56dfe29db --- /dev/null +++ b/pkg/ensurance/executor/waterline.go @@ -0,0 +1,244 @@ +package executor + +import ( + "math" + "reflect" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + + "github.com/gocrane/crane/pkg/common" + "github.com/gocrane/crane/pkg/ensurance/collector/types" +) + +// Metrics that can be measured for waterLine +// Should be consistent with metrics in collector/types/types.go +type WaterLineMetric string + +// Be consistent with metrics in collector/types/types.go +const ( + CpuUsage = WaterLineMetric(types.MetricNameCpuTotalUsage) + MemUsage = WaterLineMetric(types.MetricNameMemoryTotalUsage) +) + +const ( + // We can't get current use, so can't do actions precisely, just evict every evictedPod + MissedCurrentUsage float64 = math.MaxFloat64 +) + +// An WaterLine is a min-heap of Quantity. The values come from each objectiveEnsurance.metricRule.value +type WaterLine []resource.Quantity + +func (w WaterLine) Len() int { + return len(w) +} + +func (w WaterLine) Swap(i, j int) { + w[i], w[j] = w[j], w[i] +} + +func (w *WaterLine) Push(x interface{}) { + *w = append(*w, x.(resource.Quantity)) +} + +func (w *WaterLine) Pop() interface{} { + old := *w + n := len(old) + x := old[n-1] + *w = old[0 : n-1] + return x +} + +func (w *WaterLine) PopSmallest() *resource.Quantity { + wl := *w + return &wl[0] +} + +func (w WaterLine) Less(i, j int) bool { + cmp := w[i].Cmp(w[j]) + if cmp == -1 { + return true + } + return false +} + +func (w WaterLine) String() string { + str := "" + for i := 0; i < w.Len(); i++ { + str += w[i].String() + str += " " + } + return str +} + +// WaterLines 's key is the metric name, value is waterline which get from each objectiveEnsurance.metricRule.value +type WaterLines map[WaterLineMetric]*WaterLine + +// DivideMetricsByThrottleQuantified divide metrics by whether metrics can be throttleQuantified +func (e WaterLines) DivideMetricsByThrottleQuantified() (MetricsThrottleQuantified []WaterLineMetric, MetricsNotThrottleQuantified []WaterLineMetric) { + for m := range e { + if metricMap[m].ThrottleQuantified == true { + MetricsThrottleQuantified = append(MetricsThrottleQuantified, m) + } else { + MetricsNotThrottleQuantified = append(MetricsNotThrottleQuantified, m) + } + } + return +} + +// DivideMetricsByEvictQuantified divide metrics in waterlines into can be EvictQuantified and can not be EvictQuantified +func (e WaterLines) DivideMetricsByEvictQuantified() (metricsEvictQuantified []WaterLineMetric, metricsNotEvictQuantified []WaterLineMetric) { + for m := range e { + if metricMap[m].EvictQuantified == true { + metricsEvictQuantified = append(metricsEvictQuantified, m) + } else { + metricsNotEvictQuantified = append(metricsNotEvictQuantified, m) + } + } + return +} + +// GetHighestPriorityThrottleAbleMetric get the highest priority in metrics from waterlines +func (e WaterLines) GetHighestPriorityThrottleAbleMetric() (highestPrioriyMetric WaterLineMetric) { + highestActionPriority := 0 + for m := range e { + if metricMap[m].ThrottleAble == true { + if metricMap[m].ActionPriority >= highestActionPriority { + highestPrioriyMetric = m + highestActionPriority = metricMap[m].ActionPriority + } + } + } + return +} + +// GetHighestPriorityEvictAbleMetric get the highest priority in metrics that can be EvictAble +func (e WaterLines) GetHighestPriorityEvictAbleMetric() (highestPrioriyMetric WaterLineMetric) { + highestActionPriority := 0 + for m := range e { + if metricMap[m].EvictAble == true { + if metricMap[m].ActionPriority >= highestActionPriority { + highestPrioriyMetric = m + highestActionPriority = metricMap[m].ActionPriority + } + } + } + return +} + +// GapToWaterLines's key is metric name, value is the difference between usage and the smallest waterline +type GapToWaterLines map[WaterLineMetric]float64 + +// Only calculate gap for metrics that can be quantified +func buildGapToWaterLine(stateMap map[string][]common.TimeSeries, + throttleExecutor ThrottleExecutor, evictExecutor EvictExecutor, executeExcessPercent float64) ( + throttleDownGapToWaterLines, throttleUpGapToWaterLines, eviceGapToWaterLines GapToWaterLines) { + + throttleDownGapToWaterLines, throttleUpGapToWaterLines, eviceGapToWaterLines = make(map[WaterLineMetric]float64), make(map[WaterLineMetric]float64), make(map[WaterLineMetric]float64) + + if !reflect.DeepEqual(evictExecutor, EvictExecutor{}) { + // Traverse EvictAbleMetric but not evictExecutor.EvictWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics + // must come from EvictAbleMetrics + for _, m := range GetEvictAbleMetricName() { + // Get the series for each metric + series, ok := stateMap[string(m)] + if !ok { + klog.Warningf("BuildEvictWaterLineGap: Evict Metric %s not found from collector stateMap", string(m)) + // Can't get current usage, so can not do actions precisely, just evict every evictedPod; + eviceGapToWaterLines[m] = MissedCurrentUsage + continue + } + + // Find the biggest used value + var maxUsed float64 + if series[0].Samples[0].Value > maxUsed { + maxUsed = series[0].Samples[0].Value + } + + // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified + evictWaterLine, evictExist := evictExecutor.EvictWaterLine[m] + + // If metric not exist in EvictWaterLine, eviceGapToWaterLines of metric will can't be calculated + if !evictExist { + delete(eviceGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildEvictWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(evictWaterLine.PopSmallest().Value())) + eviceGapToWaterLines[m] = (1 + executeExcessPercent) * (maxUsed - float64(evictWaterLine.PopSmallest().Value())) + } + } + } + + if !reflect.DeepEqual(throttleExecutor, ThrottleExecutor{}) { + // Traverse ThrottleAbleMetricName but not throttleExecutor.ThrottleDownWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics + // must come from ThrottleAbleMetrics + for _, m := range GetThrottleAbleMetricName() { + // Get the series for each metric + series, ok := stateMap[string(m)] + if !ok { + klog.Warningf("BuildThrottleWaterLineGap: Metric %s not found from collector stateMap", string(m)) + // Can't get current usage, so can not do actions precisely, just evict every evictedPod; + throttleDownGapToWaterLines[m] = MissedCurrentUsage + throttleUpGapToWaterLines[m] = MissedCurrentUsage + continue + } + + // Find the biggest used value + var maxUsed float64 + if series[0].Samples[0].Value > maxUsed { + maxUsed = series[0].Samples[0].Value + } + + // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified + throttleDownWaterLine, throttleDownExist := throttleExecutor.ThrottleDownWaterLine[m] + throttleUpWaterLine, throttleUpExist := throttleExecutor.ThrottleUpWaterLine[m] + + // If a metric does not exist in ThrottleDownWaterLine, throttleDownGapToWaterLines of this metric will can't be calculated + if !throttleDownExist { + delete(throttleDownGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildThrottleDownWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(throttleDownWaterLine.PopSmallest().Value())) + throttleDownGapToWaterLines[m] = (1 + executeExcessPercent) * (maxUsed - float64(throttleDownWaterLine.PopSmallest().Value())) + } + + // If metric not exist in ThrottleUpWaterLine, throttleUpGapToWaterLines of metric will can't be calculated + if !throttleUpExist { + delete(throttleUpGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildThrottleUpWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(throttleUpWaterLine.PopSmallest().Value())) + // Attention: different with throttleDown and evict, use waterline - used + throttleUpGapToWaterLines[m] = (1 + executeExcessPercent) * (float64(throttleUpWaterLine.PopSmallest().Value()) - maxUsed) + } + } + } + return +} + +// Whether no gaps in GapToWaterLines +func (g GapToWaterLines) GapsAllRemoved() bool { + for _, v := range g { + if v > 0 { + return false + } + } + return true +} + +// For a specified metric in GapToWaterLines, whether there still has gap +func (g GapToWaterLines) TargetGapsRemoved(metric WaterLineMetric) bool { + val, ok := g[metric] + if !ok || val <= 0 { + return true + } + return false +} + +// Whether there is a metric that can't get usage in GapToWaterLines +func (g GapToWaterLines) HasUsageMissedMetric() bool { + for m, v := range g { + if v == MissedCurrentUsage { + klog.V(6).Infof("Metric %s usage missed", m) + return true + } + } + return false +} diff --git a/pkg/ensurance/executor/waterline_test.go b/pkg/ensurance/executor/waterline_test.go new file mode 100644 index 000000000..970560cb6 --- /dev/null +++ b/pkg/ensurance/executor/waterline_test.go @@ -0,0 +1,50 @@ +package executor + +import ( + "container/heap" + "strconv" + "testing" + + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/stretchr/testify/assert" +) + +func (w WaterLine) verify(t *testing.T, i int) { + t.Helper() + n := w.Len() + j1 := 2*i + 1 + j2 := 2*i + 2 + if j1 < n { + if w.Less(j1, i) { + t.Errorf("heap invariant invalidated [%d] = %d > [%d] = %d", i, w[i].Value(), j1, w[j1].Value()) + return + } + w.verify(t, j1) + } + if j2 < n { + if w.Less(j2, i) { + t.Errorf("heap invariant invalidated [%d] = %d > [%d] = %d", i, w[i].Value(), j1, w[j2].Value()) + return + } + w.verify(t, j2) + } +} + +// TestPopSmallest make sure that we can get the smallest value +func TestPopSmallest(t *testing.T) { + h := WaterLine{} + + for i := 20; i > 0; i-- { + heap.Push(&h, resource.MustParse(strconv.Itoa(i)+"m")) + assert.Equal(t, strconv.Itoa(i)+"m", h.PopSmallest().String()) + } + + h.verify(t, 0) + + t.Logf("watetline is %s", h.String()) + for i := 1; h.Len() > 0; i++ { + heap.Pop(&h) + h.verify(t, 0) + } +} diff --git a/pkg/resource/node_resource_manager.go b/pkg/resource/node_resource_manager.go index 7aeb56675..cdb72f2b4 100644 --- a/pkg/resource/node_resource_manager.go +++ b/pkg/resource/node_resource_manager.go @@ -73,13 +73,13 @@ type NodeResourceManager struct { tspName string } -func NewNodeResourceManager(client clientset.Interface, nodeName string, op map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, +func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) (*NodeResourceManager, error) { - reserveCpuPercent, err := utils.ParsePercentage(op[v1.ResourceCPU.String()]) + reserveCpuPercent, err := utils.ParsePercentage(nodeResourceReserved[v1.ResourceCPU.String()]) if err != nil { return nil, err } - reserveMemoryPercent, err := utils.ParsePercentage(op[v1.ResourceMemory.String()]) + reserveMemoryPercent, err := utils.ParsePercentage(nodeResourceReserved[v1.ResourceMemory.String()]) if err != nil { return nil, err } diff --git a/pkg/resource/pod_resource_manger.go b/pkg/resource/pod_resource_manger.go index 87e654115..03dd07d00 100644 --- a/pkg/resource/pod_resource_manger.go +++ b/pkg/resource/pod_resource_manger.go @@ -20,6 +20,7 @@ import ( "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" "github.com/gocrane/crane/pkg/ensurance/executor" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" cgrpc "github.com/gocrane/crane/pkg/ensurance/grpc" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/known" @@ -147,7 +148,7 @@ func (o *PodResourceManager) updatePodExtResToCgroup(pod *v1.Pod) { start := time.Now() metrics.UpdateLastTime(string(known.ModulePodResourceManager), metrics.StepUpdatePodResource, start) - _, containerCPUQuotas := executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), o.state, pod) + _, containerCPUQuotas := podinfo.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), o.state, pod) for _, c := range pod.Spec.Containers { if state := utils.GetContainerStatus(pod, c); state.Running == nil { @@ -163,7 +164,7 @@ func (o *PodResourceManager) updatePodExtResToCgroup(pod *v1.Pod) { } // If container's quota is -1, pod resource manager will convert limit to quota - containerCPUQuota, err := executor.GetUsageById(containerCPUQuotas, containerId) + containerCPUQuota, err := podinfo.GetUsageById(containerCPUQuotas, containerId) if err != nil { klog.Error(err) } @@ -195,7 +196,7 @@ func (o *PodResourceManager) getCPUPeriod(pod *v1.Pod, containerId string) float now := time.Now() if o.state != nil && !now.After(o.lastStateTime.Add(StateExpiration)) { - _, containerCPUPeriods := executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), o.state, pod) + _, containerCPUPeriods := podinfo.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), o.state, pod) for _, period := range containerCPUPeriods { if period.ContainerId == containerId { return period.Value diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 0dfa7e334..71574d91c 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -150,7 +150,7 @@ func CalculatePodTemplateRequests(podTemplate *v1.PodTemplateSpec, resource v1.R // GetExtCpuRes get container's gocrane.io/cpu usage func GetExtCpuRes(container v1.Container) (resource.Quantity, bool) { for res, val := range container.Resources.Limits { - if strings.HasPrefix(res.String(), fmt.Sprintf(ExtResourcePrefixFormat, v1.ResourceCPU)) { + if strings.HasPrefix(res.String(), fmt.Sprintf(ExtResourcePrefixFormat, v1.ResourceCPU)) && val.Value() != 0 { return val, true } } @@ -212,3 +212,24 @@ func GetContainerIdFromPod(pod *v1.Pod, containerName string) string { } return "" } + +// Whether pod uses ext resource ext-resource.node.gocrane.io, and value it uses +func ExtResourceAllocated(pod *v1.Pod, resName v1.ResourceName) (hasExtResource bool, extCpuLimit, extCpuRequest int64) { + resPrefix := fmt.Sprintf(ExtResourcePrefixFormat, resName) + for i := range pod.Spec.Containers { + container := pod.Spec.Containers[i] + for res, val := range container.Resources.Limits { + if strings.HasPrefix(res.String(), resPrefix) { + hasExtResource = true + extCpuLimit += val.MilliValue() + } + } + for res, val := range container.Resources.Requests { + if strings.HasPrefix(res.String(), resPrefix) { + hasExtResource = true + extCpuRequest += val.MilliValue() + } + } + } + return +} diff --git a/pkg/utils/ref.go b/pkg/utils/ref.go index 1222fc246..00f342768 100644 --- a/pkg/utils/ref.go +++ b/pkg/utils/ref.go @@ -25,7 +25,7 @@ func GetContainerIdFromKey(key string) string { subPaths := strings.Split(key, "/") if len(subPaths) > 0 { - // if the latest sub path is pod-xxx-xxx, we regard as it od path + // if the latest sub path is pod-xxx-xxx, we regard as it pod path // if not we used the latest sub path as the containerId if strings.HasPrefix(subPaths[len(subPaths)-1], CgroupPodPrefix) { return "" diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 001810355..7e82a11aa 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -74,3 +74,20 @@ func AlmostEqual(a, b float64) bool { func StringPtr(str string) *string { return &str } + +func Bool2Uint(b bool) uint { + if b { + return 1 + } + return 0 +} + +func CmpFloat(p1, p2 float64) int32 { + if AlmostEqual(p1, p2) { + return 0 + } + if p1 < p2 { + return -1 + } + return 1 +}