Skip to content

Commit

Permalink
refactor for executor of waterline and sort framework
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyuechen committed May 24, 2022
1 parent 19d8fe7 commit 045b0c9
Show file tree
Hide file tree
Showing 15 changed files with 517 additions and 405 deletions.
24 changes: 12 additions & 12 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,13 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleDownWaterLine == nil {
e.ThrottleDownWaterLine = make(map[string]*executor.WaterLine)
e.ThrottleDownWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
// Use a heap here, so we don't need to use <nepName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWaterLine[ensurance.MetricRule.Name] == nil {
e.ThrottleDownWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
if e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleDownWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
heap.Push(e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}
Expand All @@ -547,12 +547,12 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
e.ThrottleUpWaterLine = make(map[string]*executor.WaterLine)
e.ThrottleUpWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.ThrottleUpWaterLine[ensurance.MetricRule.Name] == nil {
e.ThrottleUpWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
if e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleUpWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
heap.Push(e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}
Expand All @@ -563,12 +563,12 @@ func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.EvictWaterLine == nil {
e.EvictWaterLine = make(map[string]*executor.WaterLine)
e.EvictWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.EvictWaterLine[ensurance.MetricRule.Name] == nil {
e.EvictWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
if e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.EvictWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}
Expand Down
248 changes: 248 additions & 0 deletions pkg/ensurance/executor/cpu_usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package executor

import (
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
"github.com/gocrane/crane/pkg/ensurance/executor/sort"
cruntime "github.com/gocrane/crane/pkg/ensurance/runtime"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
)

func init() {
registerMetricMap(cpu_usage)
}

var cpu_usage = metric{
Name: CpuUsage,
SortAble: true,
SortFunc: sort.CpuUsageSorter,

ThrottleAble: true,
ThrottleQualified: true,
ThrottleFunc: throttleOnePodCpu,
RestoreFunc: restoreOnePodCpu,

EvictAble: true,
EvictQualified: true,
EvictFunc: evictOnePodCpu,
}

func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) {
pod, err := ctx.PodLister.Pods(ThrottleDownPods[index].PodKey.Namespace).Get(ThrottleDownPods[index].PodKey.Name)
if err != nil {
errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", ThrottleDownPods[index].PodKey.String()))
return
}

// Throttle for CPU metrics

for _, v := range ThrottleDownPods[index].ContainerCPUUsages {
// pause container to skip
if v.ContainerName == "" {
continue
}

klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName)

containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String())
continue
}

containerCPUPeriod, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUPeriods, v.ContainerId)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String())
continue
}

container, err := utils.GetPodContainerByName(pod, v.ContainerName)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String())
continue
}

var containerCPUQuotaNew float64
if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) {
containerCPUQuotaNew = v.Value * (1.0 - float64(ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio)
} else {
containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 - float64(ThrottleDownPods[index].CPUThrottle.StepCPURatio)/MaxRatio)
}

if requestCPU, ok := container.Resources.Requests[v1.ResourceCPU]; ok {
if float64(requestCPU.MilliValue())/CpuQuotaCoefficient > containerCPUQuotaNew {
containerCPUQuotaNew = float64(requestCPU.MilliValue()) / CpuQuotaCoefficient
}
}

if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
if float64(limitCPU.MilliValue())/CpuQuotaCoefficient*float64(ThrottleDownPods[index].CPUThrottle.MinCPURatio)/MaxRatio > containerCPUQuotaNew {
containerCPUQuotaNew = float64(limitCPU.MilliValue()) * float64(ThrottleDownPods[index].CPUThrottle.MinCPURatio) / CpuQuotaCoefficient
}
}

klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f",
containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value)

if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) {
err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)})
if err != nil {
errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", ThrottleDownPods[index].PodKey.String(), v.ContainerName, err))
continue
} else {
klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.",
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew)
}
}
released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value)
totalReleasedResource.Add(released)
}
return
}

func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) {
pod, err := ctx.PodLister.Pods(ThrottleUpPods[index].PodKey.Namespace).Get(ThrottleUpPods[index].PodKey.Name)
if err != nil {
errPodKeys = append(errPodKeys, "not found ", ThrottleUpPods[index].PodKey.String())
return
}

// Restore for CPU metric
for _, v := range ThrottleUpPods[index].ContainerCPUUsages {

// pause container to skip
if v.ContainerName == "" {
continue
}

klog.V(6).Infof("ThrottleExecutor restore container %s/%s", klog.KObj(pod), v.ContainerName)

containerCPUQuota, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUQuotas, v.ContainerId)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String())
continue
}

containerCPUPeriod, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUPeriods, v.ContainerId)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String())
continue
}

container, err := utils.GetPodContainerByName(pod, v.ContainerName)
if err != nil {
errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String())
continue
}

var containerCPUQuotaNew float64
if utils.AlmostEqual(containerCPUQuota.Value, -1.0) || utils.AlmostEqual(containerCPUQuota.Value, 0.0) {
continue
} else {
containerCPUQuotaNew = containerCPUQuota.Value / containerCPUPeriod.Value * (1.0 + float64(ThrottleUpPods[index].CPUThrottle.StepCPURatio)/MaxRatio)
}

if limitCPU, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
if float64(limitCPU.MilliValue())/CpuQuotaCoefficient < containerCPUQuotaNew {
containerCPUQuotaNew = float64(limitCPU.MilliValue()) / CpuQuotaCoefficient
}
} else {
usage, hasExtRes := utils.GetExtCpuRes(container)
if hasExtRes {
containerCPUQuotaNew = float64(usage.MilliValue()) / CpuQuotaCoefficient
}
if !hasExtRes && containerCPUQuotaNew > MaxUpQuota*containerCPUPeriod.Value/CpuQuotaCoefficient {
containerCPUQuotaNew = -1
}

}

klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f",
klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value)

if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) {

if utils.AlmostEqual(containerCPUQuotaNew, -1) {
err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)})
if err != nil {
errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String())
continue
}
} else {
err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)})
if err != nil {
klog.Errorf("Failed to updateResource, err %s", err.Error())
errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String())
continue
}
}
}
released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value)
totalReleasedResource.Add(released)
}

return
}

func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) {
wg.Add(1)

go func(evictPod podinfo.PodContext) {
defer wg.Done()

pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name)
if err != nil {
errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String())
return
}

err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds)
if err != nil {
errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String())
klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err)
return
}

metrics.ExecutorEvictCountsInc()

klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod))

// Calculate release resources
released = ConstructCpuUsageRelease(evictPod, 0.0, 0.0)
totalReleasedResource.Add(released)
}(EvictPods[index])
return
}

func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource {
if pod.PodType == podinfo.Evict {
return ReleaseResource{
CpuUsage: pod.PodCPUUsage,
}
}
if pod.PodType == podinfo.ThrottleDown {
reduction := currentContainerCpuUsage - containerCPUQuotaNew
if reduction > 0 {
return ReleaseResource{
CpuUsage: reduction,
}
}
return ReleaseResource{}
}
if pod.PodType == podinfo.ThrottleUp {
reduction := containerCPUQuotaNew - currentContainerCpuUsage
if reduction > 0 {
return ReleaseResource{
CpuUsage: reduction,
}
}
return ReleaseResource{}
}
return ReleaseResource{}
}
Loading

0 comments on commit 045b0c9

Please sign in to comment.