Skip to content

Commit

Permalink
refine cpu sorter of crane agent
Browse files Browse the repository at this point in the history
  • Loading branch information
mfanjie committed Aug 18, 2022
1 parent 5851b67 commit 01ea2e7
Show file tree
Hide file tree
Showing 18 changed files with 415 additions and 427 deletions.
125 changes: 55 additions & 70 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
"github.com/gocrane/crane/pkg/ensurance/executor"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -167,16 +167,16 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
for _, n := range neps {
for _, v := range n.Spec.ObjectiveEnsurances {
var key = strings.Join([]string{n.Name, v.Name}, ".")
ac, err := s.analyze(key, v, state)
actionContext, err := s.analyze(key, v, state)
if err != nil {
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAnalyzeError, key, 1.0)
klog.Errorf("Failed to analyze, %v.", err)
}
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(ac.Triggered)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(ac.Restored)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(actionContext.Triggered)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(actionContext.Restored)))

ac.Nep = n
actionContexts = append(actionContexts, ac)
actionContext.Nep = n
actionContexts = append(actionContexts, actionContext)
}
}

Expand Down Expand Up @@ -222,17 +222,17 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
}

func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsurance, stateMap map[string][]common.TimeSeries) (ecache.ActionContext, error) {
var ac = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName}
var actionContext = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName}

state, ok := stateMap[object.MetricRule.Name]
if !ok {
return ac, fmt.Errorf("metric %s not found", object.MetricRule.Name)
return actionContext, fmt.Errorf("metric %s not found", object.MetricRule.Name)
}

//step1: get series from value
series, err := s.getSeries(state, object.MetricRule.Selector, object.MetricRule.Name)
if err != nil {
return ac, err
return actionContext, err
}

//step2: check if triggered for NodeQOSEnsurance
Expand All @@ -241,9 +241,9 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &ac)
s.computeActionContext(threshold, key, object, &actionContext)

return ac, nil
return actionContext, nil
}

func (s *AnormalyAnalyzer) computeActionContext(threshold bool, key string, object ensuranceapi.ObjectiveEnsurance, ac *ecache.ActionContext) {
Expand Down Expand Up @@ -278,47 +278,47 @@ func (s *AnormalyAnalyzer) filterDryRun(acs []ecache.ActionContext) []ecache.Act
return dcsFiltered
}

func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, acs []ecache.ActionContext) executor.AvoidanceExecutor {
var ae executor.AvoidanceExecutor
func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, actionMap map[string]*ensuranceapi.AvoidanceAction, actionContexts []ecache.ActionContext) executor.AvoidanceExecutor {
var executor executor.AvoidanceExecutor

//step1 filter dry run ActionContext
acsFiltered := s.filterDryRun(acs)
filteredActionContext := s.filterDryRun(actionContexts)

//step2 do DisableScheduled merge
enableSchedule := s.disableSchedulingMerge(acsFiltered, avoidanceMaps, &ae)
s.mergeSchedulingActions(filteredActionContext, actionMap, &executor)

for _, ac := range acsFiltered {
action, ok := avoidanceMaps[ac.ActionName]
for _, actionCtx := range filteredActionContext {
action, ok := actionMap[actionCtx.ActionName]
if !ok {
klog.Warningf("The action %s not found.", ac.ActionName)
klog.Warningf("The action %s not found.", actionCtx.ActionName)
continue
}

//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap)
throttlePods, throttleUpPods := s.getThrottlePods(actionCtx, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule)
combineThrottleWaterLine(&executor.ThrottleExecutor, actionCtx)
// combine the replicated pod
combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods)
combineThrottleDuplicate(&executor.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)
evictPods := s.getEvictPods(actionCtx.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&ae.EvictExecutor, ac)
combineEvictWaterLine(&executor.EvictExecutor, actionCtx)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
combineEvictDuplicate(&executor.EvictExecutor, evictPods)
}
}
ae.StateMap = stateMap
executor.StateMap = stateMap

klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor)
klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", executor.ThrottleExecutor, executor.EvictExecutor)

return ae
return executor
}

func (s *AnormalyAnalyzer) logEvent(ac ecache.ActionContext, now time.Time) {
Expand Down Expand Up @@ -388,12 +388,11 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
return false
}

func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext,
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {
func (s *AnormalyAnalyzer) getThrottlePods(actionCtx ecache.ActionContext, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {

throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{}

if !ac.Triggered && !(enableSchedule && ac.Restored) {
if !actionCtx.Triggered && !actionCtx.Restored {
return throttlePods, throttleUpPods
}

Expand All @@ -404,11 +403,11 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action
}

for _, pod := range allPods {
if ac.Triggered {
throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown))
if actionCtx.Triggered {
throttlePods = append(throttlePods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleDown))
}
if enableSchedule && ac.Restored {
throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp))
if actionCtx.Restored {
throttleUpPods = append(throttleUpPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleUp))
}
}

Expand All @@ -426,67 +425,53 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
}

for _, pod := range allPods {
evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict))
evictPods = append(evictPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.Evict))
}
}
return evictPods
}

func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) bool {
func (s *AnormalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) {
var now = time.Now()

// If any rules are triggered, the avoidance is true,otherwise the avoidance is false.
// If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
var enableScheduling, avoidance, restore bool

defer func() {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(utils.Bool2Int32(enableScheduling)))
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeAvoidance, float64(utils.Bool2Int32(avoidance)))
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeRestore, float64(utils.Bool2Int32(restore)))
}()

// If the ensurance rules are empty, it must be recovered soon.
// So we set enableScheduling true
if len(acsFiltered) == 0 {
enableScheduling = true
if len(actionContexts) == 0 {
s.ToggleScheduleSetting(ae, false)
} else {
for _, ac := range acsFiltered {
for _, ac := range actionContexts {
action, ok := avoidanceMaps[ac.ActionName]
if !ok {
klog.Warningf("DoMerge for detection,but the action %s not found", ac.ActionName)
continue
}

if ac.Triggered {
avoidance = true
enableScheduling = false
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0))
s.ToggleScheduleSetting(ae, true)
}

if ac.Restored {
restore = true
if !avoidance && now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds)*time.Second)) {
enableScheduling = true
if now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds) * time.Second)) {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1))
s.ToggleScheduleSetting(ae, false)
}
}
}
}
}

if avoidance {
s.lastTriggeredTime = now
ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

if enableScheduling {
ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
func (s *AnormalyAnalyzer) ToggleScheduleSetting(ae *executor.AvoidanceExecutor, toBeDisable bool) {
if toBeDisable {
s.lastTriggeredTime = time.Now()
}

return enableScheduling
ae.ScheduleExecutor.ToBeDisable = toBeDisable
ae.ScheduleExecutor.ToBeRestore = !ae.ScheduleExecutor.ToBeDisable
}

func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) {
for _, t := range throttlePods {
if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 {
if i := e.ThrottleDownPods.Find(t.Key); i == -1 {
e.ThrottleDownPods = append(e.ThrottleDownPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -500,7 +485,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}

for _, t := range throttleUpPods {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
if i := e.ThrottleUpPods.Find(t.Key); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -516,7 +501,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott

func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPods) {
for _, ep := range evictPods {
if i := e.EvictPods.Find(ep.PodKey); i == -1 {
if i := e.EvictPods.Find(ep.Key); i == -1 {
e.EvictPods = append(e.EvictPods, ep)
} else {
if (ep.DeletionGracePeriodSeconds != nil) && ((e.EvictPods[i].DeletionGracePeriodSeconds == nil) ||
Expand All @@ -527,8 +512,8 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) {
if !ac.Triggered && !(enableSchedule && ac.Restored) {
func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext) {
if !ac.Triggered && !ac.Restored {
return
}

Expand All @@ -551,7 +536,7 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}

if enableSchedule && ac.Restored {
if ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
Expand Down
Loading

0 comments on commit 01ea2e7

Please sign in to comment.