Skip to content

Commit

Permalink
add waterLine for precise executor
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyuechen committed May 24, 2022
1 parent 87a49c2 commit 19d8fe7
Show file tree
Hide file tree
Showing 17 changed files with 855 additions and 282 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func NewAgent(ctx context.Context,
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.GetStateFunc())
managers = appendManagerIfNotNil(managers, avoidanceManager)

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
tspName, err := agent.CreateNodeResourceTsp()
if err != nil {
Expand Down
98 changes: 75 additions & 23 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package analyzer

import (
"container/heap"
"fmt"
"strings"
"time"
Expand All @@ -22,9 +23,9 @@ import (
ensurancelisters "github.com/gocrane/api/pkg/generated/listers/ensurance/v1alpha1"
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
execsort "github.com/gocrane/crane/pkg/ensurance/analyzer/sort"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
"github.com/gocrane/crane/pkg/ensurance/executor"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -188,7 +189,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
return
}

//step 4 :notice the enforcer manager
//step 4 : notice the enforcer manager
s.notify(avoidanceAction)

return
Expand Down Expand Up @@ -296,26 +297,24 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida
//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule)
// combine the replicated pod
combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&ae.EvictExecutor, ac)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
}
}

// sort the throttle executor by pod qos priority
execsort.CpuMetricsSorter(ae.ThrottleExecutor.ThrottleDownPods)
execsort.CpuMetricsSorter(ae.ThrottleExecutor.ThrottleUpPods)
ae.ThrottleExecutor.ThrottleUpPods = executor.Reverse(ae.ThrottleExecutor.ThrottleUpPods)

// sort the evict executor by pod qos priority
execsort.CpuMetricsSorter(ae.EvictExecutor.EvictPods)

return ae
}

Expand Down Expand Up @@ -367,9 +366,7 @@ func (s *AnormalyAnalyzer) getTimeSeriesFromMap(state []common.TimeSeries, selec
}

func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) {
//step1: check need to notice enforcer manager

//step2: notice by channel
//step1: notice by channel
s.actionCh <- as
return
}
Expand All @@ -389,7 +386,13 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
}

func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext,
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) (throttlePods []executor.PodContext, throttleUpPods []executor.PodContext) {
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {

throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{}

if !(ac.Triggered || (enableSchedule && ac.Restored)) {
return throttlePods, throttleUpPods
}

allPods, err := s.podLister.List(labels.Everything())
if err != nil {
Expand All @@ -399,18 +402,18 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action

for _, pod := range allPods {
if ac.Triggered {
throttlePods = append(throttlePods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Throttle))
throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown))
}
if enableSchedule && ac.Restored {
throttleUpPods = append(throttleUpPods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Throttle))
throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp))
}
}

return throttlePods, throttleUpPods
}

func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []executor.PodContext {
evictPods := []executor.PodContext{}
func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []podinfo.PodContext {
evictPods := []podinfo.PodContext{}

if triggered {
allPods, err := s.podLister.List(labels.Everything())
Expand All @@ -420,8 +423,7 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
}

for _, pod := range allPods {
evictPods = append(evictPods, executor.BuildPodBasicInfo(pod, stateMap, action, executor.Evict))

evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict))
}
}
return evictPods
Expand All @@ -432,7 +434,7 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

// If any rules are triggered, the avoidance is true,otherwise the avoidance is false.
// If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
var enableScheduling, avoidance, restore bool

defer func() {
Expand Down Expand Up @@ -469,11 +471,11 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

if avoidance {
s.lastTriggeredTime = now
ae.ScheduleExecutor.DisableClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

if enableScheduling {
ae.ScheduleExecutor.RestoreClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

return enableScheduling
Expand Down Expand Up @@ -521,3 +523,53 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) {
if !(ac.Triggered || (enableSchedule && ac.Restored)) {
return
}
if ac.Triggered {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleDownWaterLine == nil {
e.ThrottleDownWaterLine = make(map[string]*executor.WaterLine)
}
// Use a heap here, so we don't need to use <nepName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWaterLine[ensurance.MetricRule.Name] == nil {
e.ThrottleDownWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
}
heap.Push(e.ThrottleDownWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
}
}
}

if enableSchedule && ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
e.ThrottleUpWaterLine = make(map[string]*executor.WaterLine)
}
if e.ThrottleUpWaterLine[ensurance.MetricRule.Name] == nil {
e.ThrottleUpWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
}
heap.Push(e.ThrottleUpWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
}
}
}
}

func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
if ac.Triggered {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.EvictWaterLine == nil {
e.EvictWaterLine = make(map[string]*executor.WaterLine)
}
if e.EvictWaterLine[ensurance.MetricRule.Name] == nil {
e.EvictWaterLine[ensurance.MetricRule.Name] = &executor.WaterLine{}
}
heap.Push(e.EvictWaterLine[ensurance.MetricRule.Name], ensurance.MetricRule.Value)
}
}
}
}
27 changes: 21 additions & 6 deletions pkg/ensurance/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type StateCollector struct {
AnalyzerChann chan map[string][]common.TimeSeries
NodeResourceChann chan map[string][]common.TimeSeries
PodResourceChann chan map[string][]common.TimeSeries
state map[string][]common.TimeSeries
rw sync.RWMutex
}

func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsurancePolicyLister, podLister corelisters.PodLister,
Expand Down Expand Up @@ -95,7 +97,7 @@ func (s *StateCollector) Run(stop <-chan struct{}) {
start := time.Now()
metrics.UpdateLastTime(string(known.ModuleStateCollector), metrics.StepMain, start)
s.healthCheck.UpdateLastActivity(start)
s.Collect()
s.Collect(false)
metrics.UpdateDurationFromStart(string(known.ModuleStateCollector), metrics.StepMain, start)
case <-stop:
klog.Infof("StateCollector exit")
Expand All @@ -107,12 +109,11 @@ func (s *StateCollector) Run(stop <-chan struct{}) {
return
}

func (s *StateCollector) Collect() {
func (s *StateCollector) Collect(waterLine bool) {
wg := sync.WaitGroup{}
start := time.Now()

var data = make(map[string][]common.TimeSeries)
var mux sync.Mutex

s.collectors.Range(func(key, value interface{}) bool {
c := value.(Collector)
Expand All @@ -125,11 +126,11 @@ func (s *StateCollector) Collect() {
defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleStateCollector), string(c.GetType()), metrics.StepCollect, start)

if cdata, err := c.Collect(); err == nil {
mux.Lock()
s.rw.Lock()
for key, series := range cdata {
data[key] = series
}
mux.Unlock()
s.rw.Unlock()
}
}(c, data)

Expand All @@ -138,7 +139,12 @@ func (s *StateCollector) Collect() {

wg.Wait()

s.AnalyzerChann <- data
// If Collect is not called by waterline related logic but StateCollector.Run, AnalyzerChann should not get update, which will trigger recursive analyzes and executes
if !waterLine {
s.AnalyzerChann <- data
}

s.state = data

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
s.NodeResourceChann <- data
Expand Down Expand Up @@ -245,3 +251,12 @@ func CheckMetricNameExist(name string) bool {

return false
}

func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries {
return func() map[string][]common.TimeSeries {
s.Collect(true)
s.rw.RLock()
defer s.rw.RUnlock()
return s.state
}
}
1 change: 1 addition & 0 deletions pkg/ensurance/collector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
MetricNetworkDropIn MetricName = "network_drop_in"
MetricNetworkDropOut MetricName = "network_drop_out"

// Attention: this value is cpuUsageIncrease/timeIncrease, not cpuUsage
MetricNameContainerCpuTotalUsage MetricName = "container_cpu_total_usage"
MetricNameContainerCpuLimit MetricName = "container_cpu_limit"
MetricNameContainerCpuQuota MetricName = "container_cpu_quota"
Expand Down
Loading

0 comments on commit 19d8fe7

Please sign in to comment.