Skip to content

Commit

Permalink
add log and debug
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkaiyue committed Jun 11, 2022
1 parent 32e7075 commit 4532054
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 128 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewAgent(ctx context.Context,
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.GetStateFunc(), executeExcess)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess)
managers = appendManagerIfNotNil(managers, avoidanceManager)

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
Expand Down
23 changes: 19 additions & 4 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
}
}

klog.V(6).Infof("Analyze actionContexts: %v", actionContexts)
klog.V(6).Infof("Analyze actionContexts: %#v", actionContexts)

//step 3 : merge
avoidanceAction := s.merge(state, avoidanceMaps, actionContexts)
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
for _, ts := range series {
triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value)

klog.V(6).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
klog.V(4).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
object.MetricRule.Name,
ts.Samples[0].Value,
common.GetValueByName(ts.Labels, common.LabelNamePodNamespace),
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu
//step2: check if triggered for NodeQOSEnsurance
threshold := s.trigger(series, object)

klog.V(4).Infof("for NodeQOS %s, metrics reach the threshold: %v", key, threshold)
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &ac)
Expand Down Expand Up @@ -314,6 +314,9 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
}
}
ae.StateMap = stateMap

klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor)

return ae
}
Expand Down Expand Up @@ -495,8 +498,8 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}
}
}
for _, t := range throttleUpPods {

for _, t := range throttleUpPods {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
Expand Down Expand Up @@ -543,6 +546,10 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
}
}

for waterLineMetric, waterlines := range e.ThrottleDownWaterLine {
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}

if enableSchedule && ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
Expand All @@ -556,6 +563,10 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont
}
}
}

for waterLineMetric, waterlines := range e.ThrottleUpWaterLine {
klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}

func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
Expand All @@ -571,5 +582,9 @@ func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}

for waterLineMetric, waterlines := range e.EvictWaterLine {
klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}
}
18 changes: 16 additions & 2 deletions pkg/ensurance/collector/cadvisor/cadvisor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package cadvisor

import (
"math"
"net/http"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -127,6 +129,16 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
for key, v := range containers {
containerId := utils.GetContainerIdFromKey(key)
containerName := utils.GetContainerNameFromPod(pod, containerId)
klog.V(6).Infof("Key is %s, containerId is %s, containerName is %s", key, containerId, containerName)

if reflect.DeepEqual(cadvisorapiv2.ContainerInfo{}, v) {
klog.Warning("ContainerInfo is nil")
} else {
if len(v.Stats) == 0 {
klog.Warning("ContainerInfo.Stats is empty")
}
}

// Filter the sandbox container
if (containerId != "") && (containerName == "") {
continue
Expand All @@ -145,10 +157,11 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
}

if state, ok := c.latestContainersStates[key]; ok {
klog.V(8).Infof("For key %s, LatestContainersStates exist", key)
var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes)

cpuUsageSample, schedRunqueueTime := caculateCPUUsage(&v, &state)
if cpuUsageSample == 0 && schedRunqueueTime == 0 {
if cpuUsageSample == 0 && schedRunqueueTime == 0 || math.IsNaN(cpuUsageSample) {
continue
}
if hasExtRes {
Expand All @@ -160,7 +173,7 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
addSampleToStateMap(types.MetricNameContainerCpuQuota, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Quota), now), stateMap)
addSampleToStateMap(types.MetricNameContainerCpuPeriod, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Period), now), stateMap)

klog.V(10).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f", klog.KObj(pod), containerName, key, schedRunqueueTime)
klog.V(8).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f, container_cpu_total_usage %#v", klog.KObj(pod), containerName, key, schedRunqueueTime, cpuUsageSample)
}
containerStates[key] = ContainerState{stat: v, timestamp: now}
}
Expand Down Expand Up @@ -202,6 +215,7 @@ func caculateCPUUsage(info *cadvisorapiv2.ContainerInfo, state *ContainerState)
cpuUsageIncrease := info.Stats[0].Cpu.Usage.Total - state.stat.Stats[0].Cpu.Usage.Total
schedRunqueueTimeIncrease := info.Stats[0].Cpu.Schedstat.RunqueueTime - state.stat.Stats[0].Cpu.Schedstat.RunqueueTime
timeIncrease := info.Stats[0].Timestamp.UnixNano() - state.stat.Stats[0].Timestamp.UnixNano()

cpuUsageSample := float64(cpuUsageIncrease) / float64(timeIncrease)
schedRunqueueTime := float64(schedRunqueueTimeIncrease) * 1000 * 1000 / float64(timeIncrease)
return cpuUsageSample, schedRunqueueTime
Expand Down
23 changes: 8 additions & 15 deletions pkg/ensurance/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type StateCollector struct {
AnalyzerChann chan map[string][]common.TimeSeries
NodeResourceChann chan map[string][]common.TimeSeries
PodResourceChann chan map[string][]common.TimeSeries
state map[string][]common.TimeSeries
State map[string][]common.TimeSeries
rw sync.RWMutex
}

Expand All @@ -45,6 +45,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura
analyzerChann := make(chan map[string][]common.TimeSeries)
nodeResourceChann := make(chan map[string][]common.TimeSeries)
podResourceChann := make(chan map[string][]common.TimeSeries)
State := make(map[string][]common.TimeSeries)
return &StateCollector{
nodeName: nodeName,
nepLister: nepLister,
Expand All @@ -59,6 +60,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura
collectors: &sync.Map{},
cadvisorManager: manager,
exclusiveCPUSet: exclusiveCPUSet,
State: State,
}
}

Expand Down Expand Up @@ -113,8 +115,6 @@ func (s *StateCollector) Collect(waterLine bool) {
wg := sync.WaitGroup{}
start := time.Now()

var data = make(map[string][]common.TimeSeries)

s.collectors.Range(func(key, value interface{}) bool {
c := value.(Collector)

Expand All @@ -132,26 +132,21 @@ func (s *StateCollector) Collect(waterLine bool) {
}
s.rw.Unlock()
}
}(c, data)
}(c, s.State)

return true
})

wg.Wait()

// If Collect is not called by waterline related logic but StateCollector.Run, AnalyzerChann should not get update, which will trigger recursive analyzes and executes
if !waterLine {
s.AnalyzerChann <- data
}

s.state = data
s.AnalyzerChann <- s.State

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
s.NodeResourceChann <- data
s.NodeResourceChann <- s.State
}

if podResource := utilfeature.DefaultFeatureGate.Enabled(features.CranePodResource); podResource {
s.PodResourceChann <- data
s.PodResourceChann <- s.State
}
}

Expand Down Expand Up @@ -255,8 +250,6 @@ func CheckMetricNameExist(name string) bool {
func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries {
return func() map[string][]common.TimeSeries {
s.Collect(true)
s.rw.RLock()
defer s.rw.RUnlock()
return s.state
return s.State
}
}
2 changes: 1 addition & 1 deletion pkg/ensurance/collector/nodelocal/nodelocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (n *NodeLocal) Collect() (map[string][]common.TimeSeries, error) {
}
}

klog.V(10).Info("Node local collecting, status: %v", status)
klog.V(6).Info("Node local collecting, status: %#v", status)

return status, nil
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/ensurance/executor/cpu_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle

// Throttle for CPU metrics

klog.V(6).Infof("index %d, containerusage is %#v", index, ThrottleDownPods[index].ContainerCPUUsages)

for _, v := range ThrottleDownPods[index].ContainerCPUUsages {
// pause container to skip
if v.ContainerName == "" {
continue
}

klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName)
klog.V(4).Infof("ThrottleExecutor begin to avoid container %s/%s", klog.KObj(pod), v.ContainerName)

containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId)
if err != nil {
Expand Down Expand Up @@ -88,8 +90,8 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle
}
}

klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f",
containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value)
klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f",
containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value)

if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) {
err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)})
Expand All @@ -98,11 +100,14 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle
continue
} else {
klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.",
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew)
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value)

released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value)
klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].PodKey.String(), container.Name, released[CpuUsage])

totalReleasedResource.Add(released)
}
}
released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value)
totalReleasedResource.Add(released)
}
return
}
Expand Down Expand Up @@ -164,8 +169,8 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod

}

klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f",
klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value)
klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f",
containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value)

if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) {

Expand All @@ -182,10 +187,14 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod
errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String())
continue
}
klog.V(4).Infof("ThrottleExecutor restore pod %s, container %s, set cpu quota %.2f, .",
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value)
released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value)
klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].PodKey, container.Name, released[CpuUsage])

totalReleasedResource.Add(released)
}
}
released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value)
totalReleasedResource.Add(released)
}

return
Expand All @@ -194,6 +203,10 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod
func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) {
wg.Add(1)

// Calculate release resources
released = ConstructCpuUsageRelease(EvictPods[index], 0.0, 0.0)
totalReleasedResource.Add(released)

go func(evictPod podinfo.PodContext) {
defer wg.Done()

Expand All @@ -213,22 +226,18 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel
metrics.ExecutorEvictCountsInc()

klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod))

// Calculate release resources
released = ConstructCpuUsageRelease(evictPod, 0.0, 0.0)
totalReleasedResource.Add(released)
}(EvictPods[index])
return
}

func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource {
if pod.PodType == podinfo.Evict {
return ReleaseResource{
CpuUsage: pod.PodCPUUsage,
CpuUsage: pod.PodCPUUsage * CpuQuotaCoefficient,
}
}
if pod.PodType == podinfo.ThrottleDown {
reduction := currentContainerCpuUsage - containerCPUQuotaNew
reduction := (currentContainerCpuUsage - containerCPUQuotaNew) * CpuQuotaCoefficient
if reduction > 0 {
return ReleaseResource{
CpuUsage: reduction,
Expand All @@ -237,7 +246,7 @@ func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, curr
return ReleaseResource{}
}
if pod.PodType == podinfo.ThrottleUp {
reduction := containerCPUQuotaNew - currentContainerCpuUsage
reduction := (containerCPUQuotaNew - currentContainerCpuUsage) * CpuQuotaCoefficient
if reduction > 0 {
return ReleaseResource{
CpuUsage: reduction,
Expand Down
Loading

0 comments on commit 4532054

Please sign in to comment.