Skip to content

Commit

Permalink
Sort pods to be executed by multi dimensions; Sort func orders can be…
Browse files Browse the repository at this point in the history
… customized
  • Loading branch information
kaiyuechen committed Mar 13, 2022
1 parent 2a39201 commit d42e963
Show file tree
Hide file tree
Showing 13 changed files with 578 additions and 172 deletions.
4 changes: 2 additions & 2 deletions cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func Run(ctx context.Context, opts *options.Options) error {
nepInformer.Informer()
actionInformer.Informer()

agent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, kubeClient, craneClient,
podInformer, nodeInformer, nepInformer, actionInformer, opts.Ifaces, healthCheck, opts.CollectInterval)
agent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, kubeClient, craneClient, podInformer, nodeInformer,
nepInformer, actionInformer, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ThrottleSortSequences, opts.EvictSortSequences)
nepInformer.Informer()
actionInformer.Informer()

Expand Down
6 changes: 6 additions & 0 deletions cmd/crane-agent/app/options/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Options struct {
MaxInactivity time.Duration
// Ifaces is the network devices to collect metric
Ifaces []string
// ThrottleSortSequences is the sequence of sort method for throttling pod
ThrottleSortSequences []string
// EvictSortSequences is the sequence of sort method for evicting pod
EvictSortSequences []string
}

// NewOptions builds an empty options.
Expand All @@ -48,4 +52,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.DurationVar(&o.CollectInterval, "collect-interval", 10*time.Second, "period for the state collector to collect metrics, default: 10s")
flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0")
flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min")
flags.StringArrayVar(&o.ThrottleSortSequences, "throttole-sort", []string{}, "Define the sequences of sort method for throttling pod, such as ExtResourceEnable,ClassAndPriority...")
flags.StringArrayVar(&o.EvictSortSequences, "evict-sort", []string{}, "Define the sequences of sort method for evicting pod, such as ExtResourceEnable,ClassAndPriority...")
}
8 changes: 6 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ func NewAgent(ctx context.Context,
ifaces []string,
healthCheck *metrics.HealthCheck,
CollectInterval time.Duration,
throttleSortSequences []string,
evictSortSequences []string,
) (*Agent, error) {
var managers []manager.Manager
var noticeCh = make(chan executor.AvoidanceExecutor)

utilruntime.Must(ensuranceapi.AddToScheme(scheme.Scheme))

stateCollector := collector.NewStateCollector(nodeName, nepInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval)
stateCollector := collector.NewStateCollector(nodeName, nepInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(),
ifaces, healthCheck, CollectInterval)
managers = append(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.StateChann, noticeCh)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer,
stateCollector.StateChann, noticeCh, throttleSortSequences, evictSortSequences)
managers = append(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint)
managers = append(managers, avoidanceManager)
Expand Down
55 changes: 33 additions & 22 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package analyzer

import (
"fmt"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -24,8 +22,8 @@ import (
ensurancelisters "github.com/gocrane/api/pkg/generated/listers/ensurance/v1alpha1"
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
execsort "github.com/gocrane/crane/pkg/ensurance/analyzer/sort"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
stypes "github.com/gocrane/crane/pkg/ensurance/collector/types"
"github.com/gocrane/crane/pkg/ensurance/executor"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
Expand Down Expand Up @@ -56,6 +54,9 @@ type AnormalyAnalyzer struct {
restored map[string]uint64
actionEventStatus map[string]ecache.DetectionStatus
lastTriggeredTime time.Time

evictRankFunc execsort.EvictRankFunc
throttleRankFunc execsort.ThrottleRankFunc
}

// NewAnormalyAnalyzer create an analyzer manager
Expand All @@ -67,13 +68,19 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
actionInformer v1alpha1.AvoidanceActionInformer,
stateChann chan map[string][]common.TimeSeries,
noticeCh chan<- executor.AvoidanceExecutor,
throttleSortSequences []string,
evictSortSequences []string,
) *AnormalyAnalyzer {

expressionEvaluator := evaluator.NewExpressionEvaluator()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "crane-agent"})
evictRankFunc := execsort.EvictRankFuncConstruct(evictSortSequences)

// TODO: After review, add custom option like evictRankFunc
throttleRankFunc := execsort.ThrottleRankFuncConstruct()
return &AnormalyAnalyzer{
nodeName: nodeName,
evaluator: expressionEvaluator,
Expand All @@ -91,6 +98,8 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
triggered: make(map[string]uint64),
restored: make(map[string]uint64),
actionEventStatus: make(map[string]ecache.DetectionStatus),
evictRankFunc: evictRankFunc,
throttleRankFunc: throttleRankFunc,
}
}

Expand Down Expand Up @@ -307,18 +316,19 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action)
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
}
}

// sort the throttle executor by pod qos priority
sort.Sort(ae.ThrottleExecutor.ThrottleDownPods)
sort.Sort(sort.Reverse(ae.ThrottleExecutor.ThrottleUpPods))
s.throttleRankFunc(ae.ThrottleExecutor.ThrottleDownPods)
s.throttleRankFunc(ae.ThrottleExecutor.ThrottleUpPods)
ae.ThrottleExecutor.ThrottleUpPods = executor.Reverse(ae.ThrottleExecutor.ThrottleUpPods)

// sort the evict executor by pod qos priority
sort.Sort(ae.EvictExecutor.EvictPods)
s.evictRankFunc(ae.EvictExecutor.EvictPods)

return ae
}
Expand Down Expand Up @@ -415,7 +425,7 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action
return throttlePods, throttleUpPods
}

func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction) []executor.EvictPod {
func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []executor.EvictPod {
evictPods := []executor.EvictPod{}

if triggered {
Expand All @@ -425,10 +435,9 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
return evictPods
}

for _, v := range allPods {
var classAndPriority = executor.ClassAndPriority{PodQOSClass: v.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(v.Spec.Priority, 0)}
evictPods = append(evictPods, executor.EvictPod{DeletionGracePeriodSeconds: action.Spec.Eviction.TerminationGracePeriodSeconds,
PodKey: types.NamespacedName{Name: v.Name, Namespace: v.Namespace}, ClassAndPriority: classAndPriority})
for _, pod := range allPods {
evictPods = append(evictPods, evictPodConstruct(pod, stateMap, action))

}
}
return evictPods
Expand Down Expand Up @@ -488,24 +497,26 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

func throttlePodConstruct(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction) executor.ThrottlePod {
var throttlePod executor.ThrottlePod
var qosPriority = executor.ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)}

throttlePod.PodTypes = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
throttlePod.PodBasicInfo = executor.PodBasicInfoConstruct(pod, stateMap)
throttlePod.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio)
throttlePod.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio)

throttlePod.PodCPUUsage, throttlePod.ContainerCPUUsages = executor.GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod)
throttlePod.PodCPUShare, throttlePod.ContainerCPUShares = executor.GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod)
throttlePod.PodCPUQuota, throttlePod.ContainerCPUQuotas = executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod)
throttlePod.PodCPUPeriod, throttlePod.ContainerCPUPeriods = executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod)
throttlePod.PodQOSPriority = qosPriority

return throttlePod
}

func evictPodConstruct(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction) executor.EvictPod {
var evictPod executor.EvictPod

evictPod.PodBasicInfo = executor.PodBasicInfoConstruct(pod, stateMap)
evictPod.DeletionGracePeriodSeconds = action.Spec.Eviction.TerminationGracePeriodSeconds

return evictPod
}

func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) {
for _, t := range throttlePods {
if i := e.ThrottleDownPods.Find(t.PodTypes); i == -1 {
if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 {
e.ThrottleDownPods = append(e.ThrottleDownPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -519,7 +530,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}
for _, t := range throttleUpPods {

if i := e.ThrottleUpPods.Find(t.PodTypes); i == -1 {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio {
Expand Down
182 changes: 182 additions & 0 deletions pkg/ensurance/analyzer/sort/evict-sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package sort

import (
"sort"

v1 "k8s.io/api/core/v1"

"github.com/gocrane/crane/pkg/ensurance/executor"
"github.com/gocrane/crane/pkg/utils"
)

// EvictRankFunc sorts the pods in eviction order
type EvictRankFunc func(pods []executor.EvictPod)

var evictSortFunc = map[string]func(p1, p2 executor.EvictPod) int{
"ExtResourceEnable": evictUseExtResource,
"ClassAndPriority": evictClassAndPriority,
"ExtResourceUsage": evictExtCpuUsed2Limit,
"CpuUsage": evictCpuUsage,
"RunningTime": evictStartTime,
}

func EvictRankFuncConstruct(customize []string) EvictRankFunc {
var evictRankFunc EvictRankFunc
if len(customize) == 0 {
cmp := []cmpEvictFunc{}
for _, sortFunc := range customize {
if f, ok := evictSortFunc[sortFunc]; ok {
cmp = append(cmp,f)
}
evictRankFunc = evictOrderedBy(cmp...).Sort
return evictRankFunc
}
}
evictRankFunc = evictRankCpuPressure

return evictRankFunc
}

func evictRankCpuPressure(pods []executor.EvictPod) {
evictOrderedBy(evictUseExtResource, evictClassAndPriority, evictExtCpuUsed2Limit, evictCpuUsage, evictStartTime).Sort(pods)
}

// evictStartTime compares pods by pod's start time
func evictStartTime(p1, p2 executor.EvictPod) int {
t1 := p1.StartTime
t2 := p2.StartTime

if t1 == nil && t2 == nil {
return 0
}
if t1 == nil {
// maybe t1 is just starting
return 1
}
if t2 == nil {
return -1
}

if t1.Before(t2) {
return 1
} else if t1.Equal(t2) {
return 0
}
return -1
}

// evictClassAndPriority compares pods by pod's ClassAndPriority
func evictClassAndPriority(p1, p2 executor.EvictPod) int {
return int(executor.CompareClassAndPriority(p1.ClassAndPriority, p2.ClassAndPriority))
}

// evictUseExtResource compares pod by whether use ext resource
func evictUseExtResource(p1, p2 executor.EvictPod) int {
use1 := utils.TransBool2Uint(p1.UseExtResource)
use2 := utils.TransBool2Uint(p2.UseExtResource)

if use1 == use2 {
return 0
}
if use1 < use2 {
return -1
}
return 1
}

// evictExtCpuUsed2Limit compares the partition extcpu usage of extcpu limit
func evictExtCpuUsed2Limit(p1, p2 executor.EvictPod) int {
// if both pod don't use ext resource, then return
if p1.UseExtResource == false && p2.UseExtResource == false {
return 0
}

p1Ratio := p1.PodCPUUsage / float64(p1.ExtCpuLimit)
p2Ratio := p2.PodCPUUsage / float64(p2.ExtCpuLimit)

if utils.AlmostEqual(p1Ratio, p2Ratio) {
return 0
}
if p1Ratio < p2Ratio {
return -1
}
return 1
}

// evictCpuUsage compares the partition extcpu usage of extcpu limit
func evictCpuUsage(p1, p2 executor.EvictPod) int {
var p1usage, p2usage float64
// if both pod is PodQOSBestEffort, then compare the absolute usage;otherwise, cmpare the ratio compared with PodCPUQuota
if p1.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort && p2.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort {
p1usage = p1.PodCPUUsage
p2usage = p2.PodCPUUsage
} else {
p1usage = p1.PodCPUUsage * p1.PodCPUPeriod / p1.PodCPUQuota
p2usage = p2.PodCPUUsage * p2.PodCPUPeriod / p2.PodCPUQuota
}
if utils.AlmostEqual(p1usage, p2usage) {
return 0
}
if p1usage < p2usage {
return -1
}
return 1
}

// Cmp compares p1 and p2 and returns:
//
// -1 if p1 < p2
// 0 if p1 == p2
// +1 if p1 > p2
//
type cmpEvictFunc func(p1, p2 executor.EvictPod) int

// evictSorter implements the Sort interface, sorting changes within.
type evictSorter struct {
pods []executor.EvictPod
cmp []cmpEvictFunc
}

// Sort sorts the argument slice according to the less functions passed to evictOrderedBy.
func (ms *evictSorter) Sort(pods []executor.EvictPod) {
ms.pods = pods
sort.Sort(ms)
}

// evictOrderedBy returns a Sorter that sorts using the cmp functions, in order.
// Call its Sort method to sort the data.
func evictOrderedBy(cmp ...cmpEvictFunc) *evictSorter {
return &evictSorter{
cmp: cmp,
}
}

// Len is part of sort.Interface.
func (ms *evictSorter) Len() int {
return len(ms.pods)
}

// Swap is part of sort.Interface.
func (ms *evictSorter) Swap(i, j int) {
ms.pods[i], ms.pods[j] = ms.pods[j], ms.pods[i]
}

// Less is part of sort.Interface.
func (ms *evictSorter) Less(i, j int) bool {
p1, p2 := ms.pods[i], ms.pods[j]
var k int
for k = 0; k < len(ms.cmp)-1; k++ {
cmpResult := ms.cmp[k](p1, p2)
// p1 is less than p2
if cmpResult < 0 {
return true
}
// p1 is greater than p2
if cmpResult > 0 {
return false
}
// we don't know yet
}
// the last cmp func is the final decider
return ms.cmp[k](p1, p2) < 0
}
Loading

0 comments on commit d42e963

Please sign in to comment.