Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine cpu sorter of crane agent #487

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 55 additions & 70 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
"github.com/gocrane/crane/pkg/ensurance/executor"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -167,16 +167,16 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
for _, n := range neps {
for _, v := range n.Spec.ObjectiveEnsurances {
var key = strings.Join([]string{n.Name, v.Name}, ".")
ac, err := s.analyze(key, v, state)
actionContext, err := s.analyze(key, v, state)
if err != nil {
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAnalyzeError, key, 1.0)
klog.Errorf("Failed to analyze, %v.", err)
}
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(ac.Triggered)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(ac.Restored)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(actionContext.Triggered)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(actionContext.Restored)))

ac.Nep = n
actionContexts = append(actionContexts, ac)
actionContext.Nep = n
actionContexts = append(actionContexts, actionContext)
}
}

Expand Down Expand Up @@ -222,17 +222,17 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
}

func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsurance, stateMap map[string][]common.TimeSeries) (ecache.ActionContext, error) {
var ac = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName}
var actionContext = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName}

state, ok := stateMap[object.MetricRule.Name]
if !ok {
return ac, fmt.Errorf("metric %s not found", object.MetricRule.Name)
return actionContext, fmt.Errorf("metric %s not found", object.MetricRule.Name)
}

//step1: get series from value
series, err := s.getSeries(state, object.MetricRule.Selector, object.MetricRule.Name)
if err != nil {
return ac, err
return actionContext, err
}

//step2: check if triggered for NodeQOSEnsurance
Expand All @@ -241,9 +241,9 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &ac)
s.computeActionContext(threshold, key, object, &actionContext)

return ac, nil
return actionContext, nil
}

func (s *AnormalyAnalyzer) computeActionContext(threshold bool, key string, object ensuranceapi.ObjectiveEnsurance, ac *ecache.ActionContext) {
Expand Down Expand Up @@ -278,47 +278,47 @@ func (s *AnormalyAnalyzer) filterDryRun(acs []ecache.ActionContext) []ecache.Act
return dcsFiltered
}

func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, acs []ecache.ActionContext) executor.AvoidanceExecutor {
var ae executor.AvoidanceExecutor
func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, actionMap map[string]*ensuranceapi.AvoidanceAction, actionContexts []ecache.ActionContext) executor.AvoidanceExecutor {
var executor executor.AvoidanceExecutor

//step1 filter dry run ActionContext
acsFiltered := s.filterDryRun(acs)
filteredActionContext := s.filterDryRun(actionContexts)

//step2 do DisableScheduled merge
enableSchedule := s.disableSchedulingMerge(acsFiltered, avoidanceMaps, &ae)
s.mergeSchedulingActions(filteredActionContext, actionMap, &executor)

for _, ac := range acsFiltered {
action, ok := avoidanceMaps[ac.ActionName]
for _, actionCtx := range filteredActionContext {
action, ok := actionMap[actionCtx.ActionName]
if !ok {
klog.Warningf("The action %s not found.", ac.ActionName)
klog.Warningf("The action %s not found.", actionCtx.ActionName)
continue
}

//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap)
throttlePods, throttleUpPods := s.getThrottlePods(actionCtx, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule)
combineThrottleWaterLine(&executor.ThrottleExecutor, actionCtx)
// combine the replicated pod
combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods)
combineThrottleDuplicate(&executor.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)
evictPods := s.getEvictPods(actionCtx.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&ae.EvictExecutor, ac)
combineEvictWaterLine(&executor.EvictExecutor, actionCtx)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
combineEvictDuplicate(&executor.EvictExecutor, evictPods)
}
}
ae.StateMap = stateMap
executor.StateMap = stateMap

klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor)
klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", executor.ThrottleExecutor, executor.EvictExecutor)

return ae
return executor
}

func (s *AnormalyAnalyzer) logEvent(ac ecache.ActionContext, now time.Time) {
Expand Down Expand Up @@ -388,12 +388,11 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
return false
}

func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext,
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {
func (s *AnormalyAnalyzer) getThrottlePods(actionCtx ecache.ActionContext, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {

throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{}

if !ac.Triggered && !(enableSchedule && ac.Restored) {
if !actionCtx.Triggered && !actionCtx.Restored {
return throttlePods, throttleUpPods
}

Expand All @@ -404,11 +403,11 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action
}

for _, pod := range allPods {
if ac.Triggered {
throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown))
if actionCtx.Triggered {
throttlePods = append(throttlePods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleDown))
}
if enableSchedule && ac.Restored {
throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp))
if actionCtx.Restored {
throttleUpPods = append(throttleUpPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleUp))
}
}

Expand All @@ -426,67 +425,53 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
}

for _, pod := range allPods {
evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict))
evictPods = append(evictPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.Evict))
}
}
return evictPods
}

func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) bool {
func (s *AnormalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) {
var now = time.Now()

// If any rules are triggered, the avoidance is true,otherwise the avoidance is false.
// If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
var enableScheduling, avoidance, restore bool

defer func() {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(utils.Bool2Int32(enableScheduling)))
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeAvoidance, float64(utils.Bool2Int32(avoidance)))
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeRestore, float64(utils.Bool2Int32(restore)))
}()

// If the ensurance rules are empty, it must be recovered soon.
// So we set enableScheduling true
if len(acsFiltered) == 0 {
enableScheduling = true
if len(actionContexts) == 0 {
s.ToggleScheduleSetting(ae, false)
} else {
for _, ac := range acsFiltered {
for _, ac := range actionContexts {
action, ok := avoidanceMaps[ac.ActionName]
if !ok {
klog.Warningf("DoMerge for detection,but the action %s not found", ac.ActionName)
continue
}

if ac.Triggered {
avoidance = true
enableScheduling = false
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0))
s.ToggleScheduleSetting(ae, true)
}

if ac.Restored {
restore = true
if !avoidance && now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds)*time.Second)) {
enableScheduling = true
if now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds) * time.Second)) {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1))
s.ToggleScheduleSetting(ae, false)
}
}
}
}
}

if avoidance {
s.lastTriggeredTime = now
ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

if enableScheduling {
ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
func (s *AnormalyAnalyzer) ToggleScheduleSetting(ae *executor.AvoidanceExecutor, toBeDisable bool) {
if toBeDisable {
s.lastTriggeredTime = time.Now()
}

return enableScheduling
ae.ScheduleExecutor.ToBeDisable = toBeDisable
ae.ScheduleExecutor.ToBeRestore = !ae.ScheduleExecutor.ToBeDisable
}

func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) {
for _, t := range throttlePods {
if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 {
if i := e.ThrottleDownPods.Find(t.Key); i == -1 {
e.ThrottleDownPods = append(e.ThrottleDownPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -500,7 +485,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}

for _, t := range throttleUpPods {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
if i := e.ThrottleUpPods.Find(t.Key); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -516,7 +501,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott

func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPods) {
for _, ep := range evictPods {
if i := e.EvictPods.Find(ep.PodKey); i == -1 {
if i := e.EvictPods.Find(ep.Key); i == -1 {
e.EvictPods = append(e.EvictPods, ep)
} else {
if (ep.DeletionGracePeriodSeconds != nil) && ((e.EvictPods[i].DeletionGracePeriodSeconds == nil) ||
Expand All @@ -527,8 +512,8 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) {
if !ac.Triggered && !(enableSchedule && ac.Restored) {
func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext) {
if !ac.Triggered && !ac.Restored {
return
}

Expand All @@ -551,7 +536,7 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}

if enableSchedule && ac.Restored {
if ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
Expand Down
Loading