Skip to content

Commit

Permalink
enhancement(eviction): load eviction support dynamic threshold (#129)
Browse files Browse the repository at this point in the history
* enhancement(eviction): evict by share pool status

* enhancement(eviction): load eviction plugin use readonly state

* chore(eviction): make test parallel

* chore(eviction): rename some function and add some metric

* enhancement(eviction): extract GetPodPoolMapFunc interface

* enhancement(eviction): ignore isolation pool

* refine implementation for load pressure

---------

Co-authored-by: shaowei.wayne <shaowei.wayne@bytedance.com>
  • Loading branch information
zzzzhhb and waynepeking348 authored Aug 1, 2023
1 parent 71cad3a commit 83bce0c
Show file tree
Hide file tree
Showing 17 changed files with 1,180 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
)

type ReclaimedResourceOptions struct {
EnableReclaim bool
ReservedResourceForReport general.ResourceList
MinReclaimedResourceForReport general.ResourceList
ReservedResourceForAllocate general.ResourceList
EnableReclaim bool
ReservedResourceForReport general.ResourceList
MinReclaimedResourceForReport general.ResourceList
ReservedResourceForAllocate general.ResourceList
ReservedResourceForReclaimedCores general.ResourceList

*cpuheadroom.CPUHeadroomOptions
*memoryheadroom.MemoryHeadroomOptions
Expand All @@ -53,6 +54,10 @@ func NewReclaimedResourceOptions() *ReclaimedResourceOptions {
v1.ResourceCPU: resource.MustParse("4"),
v1.ResourceMemory: resource.MustParse("5Gi"),
},
ReservedResourceForReclaimedCores: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("4"),
v1.ResourceMemory: resource.MustParse("0"),
},
CPUHeadroomOptions: cpuheadroom.NewCPUHeadroomOptions(),
MemoryHeadroomOptions: memoryheadroom.NewMemoryHeadroomOptions(),
}
Expand All @@ -70,6 +75,8 @@ func (o *ReclaimedResourceOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"min reclaimed resource report to cnr")
fs.Var(&o.ReservedResourceForAllocate, "reserved-resource-for-allocate",
"reserved reclaimed resource actually not allocate to reclaimed resource")
fs.Var(&o.ReservedResourceForReclaimedCores, "reserved-resource-for-reclaimed-cores",
"reserved resources for reclaimed_cores pods")

o.CPUHeadroomOptions.AddFlags(fss)
o.MemoryHeadroomOptions.AddFlags(fss)
Expand All @@ -82,6 +89,7 @@ func (o *ReclaimedResourceOptions) ApplyTo(c *reclaimedresource.ReclaimedResourc
c.ReservedResourceForReport = v1.ResourceList(o.ReservedResourceForReport)
c.MinReclaimedResourceForReport = v1.ResourceList(o.MinReclaimedResourceForReport)
c.ReservedResourceForAllocate = v1.ResourceList(o.ReservedResourceForAllocate)
c.MinReclaimedResourceForAllocate = v1.ResourceList(o.ReservedResourceForReclaimedCores)

errList = append(errList, o.CPUHeadroomOptions.ApplyTo(c.CPUHeadroomConfiguration))
errList = append(errList, o.MemoryHeadroomOptions.ApplyTo(c.MemoryHeadroomConfiguration))
Expand Down
13 changes: 11 additions & 2 deletions pkg/agent/evictionmanager/eviction_resp_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package evictionmanager

import (
"fmt"
"strings"

//nolint
"github.com/golang/protobuf/proto"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

const effectTagValueSeparator = "_"

// evictionRespCollector is used to collect eviction result from plugins, it also handles some logic such as dry run.
type evictionRespCollector struct {
conf *pkgconfig.Configuration
Expand Down Expand Up @@ -91,7 +94,7 @@ func (e *evictionRespCollector) collectEvictPods(dryRunPlugins []string, pluginN
e.getLogPrefix(dryRun), pluginName, evictPod.Pod.Namespace, evictPod.Pod.Name, evictPod.Reason, evictPod.ForceEvict)

if dryRun {
_ = e.emitter.StoreInt64(MetricsNameDryrunVictimPodCNT, 1, metrics.MetricTypeNameRaw,
_ = e.emitter.StoreInt64(MetricsNameDryRunVictimPodCNT, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "name", Val: pluginName},
metrics.MetricTag{Key: "victim_ns", Val: evictPod.Pod.Namespace},
metrics.MetricTag{Key: "victim_name", Val: evictPod.Pod.Name})
Expand Down Expand Up @@ -143,6 +146,12 @@ func (e *evictionRespCollector) collectMetThreshold(dryRunPlugins []string, plug
if resp.Condition != nil && resp.Condition.MetCondition {
general.Infof("%v plugin: %s requests to set condition: %s of type: %s",
e.getLogPrefix(dryRun), pluginName, resp.Condition.ConditionName, resp.Condition.ConditionType.String())
_ = e.emitter.StoreInt64(MetricsNameDryRunConditionCNT, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "name", Val: pluginName},
metrics.MetricTag{Key: "condition_name", Val: resp.Condition.ConditionName},
metrics.MetricTag{Key: "condition_type", Val: fmt.Sprint(resp.Condition.ConditionType)},
metrics.MetricTag{Key: "effects", Val: strings.Join(resp.Condition.Effects, effectTagValueSeparator)},
)

if !dryRun {
e.getCurrentConditions()[resp.Condition.ConditionName] = proto.Clone(resp.Condition).(*pluginapi.Condition)
Expand All @@ -162,7 +171,7 @@ func (e *evictionRespCollector) collectTopEvictionPods(dryRunPlugins []string, p
general.Infof("%v plugin %v request to evict topN pod %v/%v, reason: met threshold in scope [%v]",
e.getLogPrefix(dryRun), pluginName, pod.Namespace, pod.Name, threshold.EvictionScope)
if dryRun {
_ = e.emitter.StoreInt64(MetricsNameDryrunVictimPodCNT, 1, metrics.MetricTypeNameRaw,
_ = e.emitter.StoreInt64(MetricsNameDryRunVictimPodCNT, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "name", Val: pluginName},
metrics.MetricTag{Key: "victim_ns", Val: pod.Namespace},
metrics.MetricTag{Key: "victim_name", Val: pod.Name})
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/evictionmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const (
MetricsNameVictimPodCNT = "victims_cnt"
MetricsNameRunningPodCNT = "running_pod_cnt"
MetricsNameCandidatePodCNT = "candidate_pod_cnt"
MetricsNameDryrunVictimPodCNT = "dryrun_victims_cnt"
MetricsNameDryRunVictimPodCNT = "dryrun_victims_cnt"
MetricsNameDryRunConditionCNT = "dryrun_condition_cnt"
)

// LatestCNRGetter returns the latest CNR resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type cpuPressureEviction struct {
}

func NewCPUPressureEviction(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.State) (agent.Component, error) {
conf *config.Configuration, state state.ReadonlyState) (agent.Component, error) {
plugin, err := newCPUPressureEviction(emitter, metaServer, conf, state)
if err != nil {
return nil, fmt.Errorf("create cpu eviction plugin failed: %s", err)
Expand All @@ -60,15 +60,20 @@ func NewCPUPressureEviction(emitter metrics.MetricEmitter, metaServer *metaserve
}

func newCPUPressureEviction(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.State) (skeleton.GenericPlugin, error) {
conf *config.Configuration, state state.ReadonlyState) (skeleton.GenericPlugin, error) {
wrappedEmitter := emitter.WithTags(cpuPressureEvictionPluginName)

pressureLoadEviction, err := strategy.NewCPUPressureLoadEviction(emitter, metaServer, conf, state)
if err != nil {
return nil, fmt.Errorf("create CPUPressureLoadEviction plugin failed, err:%v", err)
}

plugin := &cpuPressureEviction{
forceEvictionList: []strategy.CPUPressureForceEviction{
strategy.NewCPUPressureSuppressionEviction(emitter, metaServer, conf, state),
},
thresholdEvictionList: []strategy.CPUPressureThresholdEviction{
strategy.NewCPUPressureLoadEviction(emitter, metaServer, conf, state),
pressureLoadEviction,
},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package strategy

import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

var getPodPoolMapFunc = DefaultGetPodPoolMapFunc

// PodPoolMap is a map keyed by pod UID, the value is a map keyed by container name
// and its value is the container info with owner pool.
type PodPoolMap map[string]map[string]*ContainerOwnerPoolInfo

func (p PodPoolMap) PutContainerOwnerPoolInfo(podUID string, containerName string, ownerPool string, poolSize int, isPool bool) {
containerOwnerPoolInfo := &ContainerOwnerPoolInfo{
OwnerPool: ownerPool,
PoolSize: poolSize,
IsPool: isPool,
}

if podMap, ok := p[podUID]; ok {
podMap[containerName] = containerOwnerPoolInfo
} else {
pm := map[string]*ContainerOwnerPoolInfo{containerName: containerOwnerPoolInfo}
p[podUID] = pm
}
}

// GetPodPoolMapFunc returns a map keyed by pod UID, the value is a map keyed by container name
// and its value is the container info with owner pool.
type GetPodPoolMapFunc func(pod.PodFetcher, state.ReadonlyState) PodPoolMap

type ContainerOwnerPoolInfo struct {
OwnerPool string
PoolSize int
IsPool bool
}

// SetGetPodPoolMapFunc provides a hook to change the implementation of GetPodPoolMapFunc
func SetGetPodPoolMapFunc(f GetPodPoolMapFunc) {
general.Infof("SetGetPodPoolMapFunc called")
getPodPoolMapFunc = f
}

var DefaultGetPodPoolMapFunc GetPodPoolMapFunc = func(fetcher pod.PodFetcher, readonlyState state.ReadonlyState) PodPoolMap {
result := make(PodPoolMap)

for podUID, entry := range readonlyState.GetPodEntries() {
for containerName, containerEntry := range entry {
if entry.IsPoolEntry() {
result.PutContainerOwnerPoolInfo(podUID, containerName, podUID, containerEntry.AllocationResult.Size(), true)
continue
}

if containerEntry == nil {
continue
} else if containerEntry.OwnerPoolName == "" {
general.Infof("skip get pool name for pod: %s, "+
"container: %s with owner pool name: %s", podUID, containerName, containerEntry.OwnerPoolName)
continue
}

result.PutContainerOwnerPoolInfo(podUID, containerName, containerEntry.OwnerPoolName, containerEntry.AllocationResult.Size(), false)
}
}

return result
}
Loading

0 comments on commit 83bce0c

Please sign in to comment.