Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precise execution according to waterline #216

Merged
merged 1 commit into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func Run(ctx context.Context, opts *options.Options) error {
actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions()
tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient,
podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval)
newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient, podInformer, nodeInformer,
nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)

if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cmd/crane-agent/app/options/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Options struct {
// Ifaces is the network devices to collect metric
Ifaces []string
NodeResourceReserved map[string]string
// ExecuteExcess is the percentage of executions that exceed the gap between current usage and waterlines
ExecuteExcess string
}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -54,4 +56,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0")
flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)")
flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min")
flags.StringVar(&o.ExecuteExcess, "execute-excess", "10%", "The percentage of executions that exceed the gap between current usage and waterlines, default: 10%.")
chenkaiyue marked this conversation as resolved.
Show resolved Hide resolved
chenkaiyue marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 3 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewAgent(ctx context.Context,
ifaces []string,
healthCheck *metrics.HealthCheck,
CollectInterval time.Duration,
executeExcess string,
) (*Agent, error) {
var managers []manager.Manager
var noticeCh = make(chan executor.AvoidanceExecutor)
Expand All @@ -88,8 +89,9 @@ func NewAgent(ctx context.Context,
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess)
chenkaiyue marked this conversation as resolved.
Show resolved Hide resolved
managers = appendManagerIfNotNil(managers, avoidanceManager)

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
tspName, err := agent.CreateNodeResourceTsp()
if err != nil {
Expand Down
150 changes: 100 additions & 50 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package analyzer

import (
"container/heap"
"fmt"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -25,8 +24,8 @@ import (
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
stypes "github.com/gocrane/crane/pkg/ensurance/collector/types"
"github.com/gocrane/crane/pkg/ensurance/executor"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -74,6 +73,7 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "crane-agent"})

return &AnormalyAnalyzer{
nodeName: nodeName,
evaluator: expressionEvaluator,
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
}
}

klog.V(6).Infof("Analyze actionContexts: %v", actionContexts)
klog.V(6).Infof("Analyze actionContexts: %#v", actionContexts)

//step 3 : merge
avoidanceAction := s.merge(state, avoidanceMaps, actionContexts)
Expand All @@ -189,7 +189,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
return
}

//step 4 :notice the enforcer manager
//step 4 : notice the enforcer manager
s.notify(avoidanceAction)

return
Expand All @@ -208,7 +208,7 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
for _, ts := range series {
triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value)

klog.V(6).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
klog.V(4).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
object.MetricRule.Name,
ts.Samples[0].Value,
common.GetValueByName(ts.Labels, common.LabelNamePodNamespace),
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu
//step2: check if triggered for NodeQOSEnsurance
threshold := s.trigger(series, object)

klog.V(4).Infof("for NodeQOS %s, metrics reach the threshold: %v", key, threshold)
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &ac)
Expand Down Expand Up @@ -297,24 +297,26 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida
//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule)
// combine the replicated pod
combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action)
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&ae.EvictExecutor, ac)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
}
}
ae.StateMap = stateMap

// sort the throttle executor by pod qos priority
sort.Sort(ae.ThrottleExecutor.ThrottleDownPods)
sort.Sort(sort.Reverse(ae.ThrottleExecutor.ThrottleUpPods))

// sort the evict executor by pod qos priority
sort.Sort(ae.EvictExecutor.EvictPods)
chenkaiyue marked this conversation as resolved.
Show resolved Hide resolved
klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor)

return ae
}
Expand Down Expand Up @@ -367,9 +369,7 @@ func (s *AnormalyAnalyzer) getTimeSeriesFromMap(state []common.TimeSeries, selec
}

func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) {
//step1: check need to notice enforcer manager

//step2: notice by channel
//step1: notice by channel
s.actionCh <- as
return
}
Expand All @@ -389,9 +389,13 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
}

func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext,
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]executor.ThrottlePod, []executor.ThrottlePod) {
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {

throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{}

throttlePods, throttleUpPods := []executor.ThrottlePod{}, []executor.ThrottlePod{}
if !ac.Triggered && !(enableSchedule && ac.Restored) {
return throttlePods, throttleUpPods
}

allPods, err := s.podLister.List(labels.Everything())
if err != nil {
Expand All @@ -401,18 +405,18 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action

for _, pod := range allPods {
if ac.Triggered {
throttlePods = append(throttlePods, throttlePodConstruct(pod, stateMap, action))
throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown))
}
if enableSchedule && ac.Restored {
throttleUpPods = append(throttleUpPods, throttlePodConstruct(pod, stateMap, action))
throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp))
}
}

return throttlePods, throttleUpPods
}

func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction) []executor.EvictPod {
evictPods := []executor.EvictPod{}
func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []podinfo.PodContext {
evictPods := []podinfo.PodContext{}

if triggered {
allPods, err := s.podLister.List(labels.Everything())
Expand All @@ -421,10 +425,8 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
return evictPods
}

for _, v := range allPods {
var classAndPriority = executor.ClassAndPriority{PodQOSClass: v.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(v.Spec.Priority, 0)}
evictPods = append(evictPods, executor.EvictPod{DeletionGracePeriodSeconds: action.Spec.Eviction.TerminationGracePeriodSeconds,
PodKey: types.NamespacedName{Name: v.Name, Namespace: v.Namespace}, ClassAndPriority: classAndPriority})
for _, pod := range allPods {
evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict))
}
}
return evictPods
Expand All @@ -435,7 +437,7 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

// If any rules are triggered, the avoidance is true,otherwise the avoidance is false.
// If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
var enableScheduling, avoidance, restore bool

defer func() {
Expand Down Expand Up @@ -472,36 +474,19 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

if avoidance {
s.lastTriggeredTime = now
ae.ScheduleExecutor.DisableClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

if enableScheduling {
ae.ScheduleExecutor.RestoreClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

return enableScheduling
}

func throttlePodConstruct(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction) executor.ThrottlePod {
var throttlePod executor.ThrottlePod
var qosPriority = executor.ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)}

throttlePod.PodTypes = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
throttlePod.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio)
throttlePod.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio)

throttlePod.PodCPUUsage, throttlePod.ContainerCPUUsages = executor.GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod)
throttlePod.PodCPUShare, throttlePod.ContainerCPUShares = executor.GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod)
throttlePod.PodCPUQuota, throttlePod.ContainerCPUQuotas = executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod)
throttlePod.PodCPUPeriod, throttlePod.ContainerCPUPeriods = executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod)
throttlePod.PodQOSPriority = qosPriority

return throttlePod
}

func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) {
for _, t := range throttlePods {
if i := e.ThrottleDownPods.Find(t.PodTypes); i == -1 {
if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 {
e.ThrottleDownPods = append(e.ThrottleDownPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -513,9 +498,9 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}
}
}
for _, t := range throttleUpPods {

if i := e.ThrottleUpPods.Find(t.PodTypes); i == -1 {
for _, t := range throttleUpPods {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -541,3 +526,68 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) {
if !ac.Triggered && !(enableSchedule && ac.Restored) {
return
}

if ac.Triggered {
chenkaiyue marked this conversation as resolved.
Show resolved Hide resolved
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleDownWaterLine == nil {
e.ThrottleDownWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
// Use a heap here, so we don't need to use <nepName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleDownWaterLine {
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}

if enableSchedule && ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
e.ThrottleUpWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleUpWaterLine {
klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}

func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
if !ac.Triggered {
return
}

for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.EvictWaterLine == nil {
e.EvictWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}

for waterLineMetric, waterlines := range e.EvictWaterLine {
klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}
18 changes: 16 additions & 2 deletions pkg/ensurance/collector/cadvisor/cadvisor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package cadvisor

import (
"math"
"net/http"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -127,6 +129,16 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
for key, v := range containers {
containerId := utils.GetContainerIdFromKey(key)
containerName := utils.GetContainerNameFromPod(pod, containerId)
klog.V(6).Infof("Key is %s, containerId is %s, containerName is %s", key, containerId, containerName)

if reflect.DeepEqual(cadvisorapiv2.ContainerInfo{}, v) {
klog.Warning("ContainerInfo is nil")
} else {
if len(v.Stats) == 0 {
klog.Warning("ContainerInfo.Stats is empty")
}
}

// Filter the sandbox container
if (containerId != "") && (containerName == "") {
continue
Expand All @@ -145,10 +157,11 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
}

if state, ok := c.latestContainersStates[key]; ok {
klog.V(8).Infof("For key %s, LatestContainersStates exist", key)
var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes)

cpuUsageSample, schedRunqueueTime := caculateCPUUsage(&v, &state)
if cpuUsageSample == 0 && schedRunqueueTime == 0 {
if cpuUsageSample == 0 && schedRunqueueTime == 0 || math.IsNaN(cpuUsageSample) {
continue
}
if hasExtRes {
Expand All @@ -160,7 +173,7 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
addSampleToStateMap(types.MetricNameContainerCpuQuota, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Quota), now), stateMap)
addSampleToStateMap(types.MetricNameContainerCpuPeriod, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Period), now), stateMap)

klog.V(10).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f", klog.KObj(pod), containerName, key, schedRunqueueTime)
klog.V(8).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f, container_cpu_total_usage %#v", klog.KObj(pod), containerName, key, schedRunqueueTime, cpuUsageSample)
}
containerStates[key] = ContainerState{stat: v, timestamp: now}
}
Expand Down Expand Up @@ -202,6 +215,7 @@ func caculateCPUUsage(info *cadvisorapiv2.ContainerInfo, state *ContainerState)
cpuUsageIncrease := info.Stats[0].Cpu.Usage.Total - state.stat.Stats[0].Cpu.Usage.Total
schedRunqueueTimeIncrease := info.Stats[0].Cpu.Schedstat.RunqueueTime - state.stat.Stats[0].Cpu.Schedstat.RunqueueTime
timeIncrease := info.Stats[0].Timestamp.UnixNano() - state.stat.Stats[0].Timestamp.UnixNano()

cpuUsageSample := float64(cpuUsageIncrease) / float64(timeIncrease)
schedRunqueueTime := float64(schedRunqueueTimeIncrease) * 1000 * 1000 / float64(timeIncrease)
return cpuUsageSample, schedRunqueueTime
Expand Down
Loading