Skip to content

Commit

Permalink
koordlet: fix prodReclaimablePredictor result to avoid influence of o…
Browse files Browse the repository at this point in the history
…versold

Signed-off-by: lijunxin <lijunxin.ljx@alibaba-inc.com>
  • Loading branch information
lijunxin559 committed Jan 21, 2025
1 parent 79036cf commit 6cfc773
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 121 deletions.
8 changes: 5 additions & 3 deletions pkg/koordlet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ const (

PredictorKey = "predictor"

StatusKey = "status"
StatusSucceed = "succeeded"
StatusFailed = "failed"
StatusKey = "status"
StatusSucceed = "succeeded"
StatusFailed = "failed"
StatusSucceedBool = 0
StatusFailedBool = 1

EvictionReasonKey = "reason"
BESuppressTypeKey = "type"
Expand Down
16 changes: 16 additions & 0 deletions pkg/koordlet/metrics/resource_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var (
Help: "the node reclaimable of different priorities resources updated by koordinator",
}, []string{NodeKey, PriorityKey, ResourceKey, UnitKey})

NodeResourcePriorityReclaimableStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "node_resource_priority_reclaimable_status",
Help: "status of node reclaimable of different priorities resources updated by koordinator",
}, []string{NodeKey, PriorityKey})

ContainerResourceRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "container_resource_requests",
Expand All @@ -50,6 +56,7 @@ var (
ResourceSummaryCollectors = []prometheus.Collector{
NodeResourceAllocatable,
NodeResourcePriorityReclaimable,
NodeResourcePriorityReclaimableStatus,
ContainerResourceRequests,
ContainerResourceLimits,
}
Expand All @@ -76,6 +83,15 @@ func RecordNodeResourcePriorityReclaimable(resourceName string, unit string, pri
NodeResourcePriorityReclaimable.With(labels).Set(value)
}

func RecordNodeResourcePriorityReclaimableStatus(priority string, value float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[PriorityKey] = priority
NodeResourcePriorityReclaimableStatus.With(labels).Set(value)
}

func RecordContainerResourceRequests(resourceName string, unit string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) {
labels := genNodeLabels()
if labels == nil {
Expand Down
116 changes: 88 additions & 28 deletions pkg/koordlet/prediction/peak_predictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ import (
// PredictorType defines constants for different types of predictors.
type PredictorType int

type PredictorContext struct {
Node *v1.Node
}

const (
// ProdReclaimablePredictor represents the type of a reclaimable production predictor.
ProdReclaimablePredictor PredictorType = iota
)

// PredictorFactory is an interface for creating predictors of different types.
type PredictorFactory interface {
New(PredictorType) Predictor
New(predictorType PredictorType, context PredictorContext) Predictor
}

type Predictor interface {
Expand All @@ -65,19 +69,22 @@ func NewPredictorFactory(predictServer PredictServer, coldStartDuration time.Dur
}

// New creates a new instance of a predictor based on the given type.
func (f *predictorFactory) New(t PredictorType) Predictor {
func (f *predictorFactory) New(t PredictorType, context PredictorContext) Predictor {
switch t {
case ProdReclaimablePredictor:
podPredictor := &podReclaimablePredictor{
predictServer: f.predictServer,
node: context.Node,
coldStartDuration: f.coldStartDuration,
safetyMarginPercent: f.safetyMarginPercent,
podFilterFn: isPodReclaimableForProd,
reclaimable: util.NewZeroResourceList(),
unReclaimable: util.NewZeroResourceList(),
pods: make(map[string]bool),
}
priorityPredictor := &priorityReclaimablePredictor{
predictServer: f.predictServer,
node: context.Node,
safetyMarginPercent: f.safetyMarginPercent,
priorityClassFilterFn: isPriorityClassReclaimableForProd,
reclaimRequest: util.NewZeroResourceList(),
Expand Down Expand Up @@ -119,7 +126,7 @@ func NewEmptyPredictorFactory() PredictorFactory {
type emptyPredictorFactory struct {
}

func (f *emptyPredictorFactory) New(t PredictorType) Predictor {
func (f *emptyPredictorFactory) New(t PredictorType, context PredictorContext) Predictor {
return &emptyPredictor{}
}

Expand All @@ -129,12 +136,13 @@ var _ Predictor = (*podReclaimablePredictor)(nil)
// e.g. A podReclaimablePredictor for Prod pods calculates the result based on the sum of the percentile of Prod pods.
type podReclaimablePredictor struct {
predictServer PredictServer
node *v1.Node
coldStartDuration time.Duration
safetyMarginPercent int
podFilterFn func(pod *v1.Pod) bool // return true if the pod is reclaimable

reclaimable v1.ResourceList
pods map[string]bool
reclaimable v1.ResourceList
unReclaimable v1.ResourceList
pods map[string]bool
}

// GetPredictorName is used to obtain the predictor name.
Expand Down Expand Up @@ -180,19 +188,36 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error {
podCPURequest := podRequests[v1.ResourceCPU]
podMemoryRequest := podRequests[v1.ResourceMemory]

// calculate the reclaimable resources: reclaimable = podRequest - peak
// calculate the unReclaimable resources: unReclaimable = peak
reclaimableCPUMilli := int64(0)
reclaimableMemoryBytes := int64(0)

unReclaimableCPUMilli := int64(0)
unReclaimableMemoryBytes := int64(0)
ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100
if p95CPU, ok := p95Resources[v1.ResourceCPU]; ok {
peakCPU := util.MultiplyMilliQuant(p95CPU, ratioAfterSafetyMargin)
unReclaimableCPUMilli = peakCPU.MilliValue()
reclaimableCPUMilli = podCPURequest.MilliValue() - peakCPU.MilliValue()
}
if p98Memory, ok := p98Resources[v1.ResourceMemory]; ok {
peakMemory := util.MultiplyQuant(p98Memory, ratioAfterSafetyMargin)
unReclaimableMemoryBytes = peakMemory.Value()
reclaimableMemoryBytes = podMemoryRequest.Value() - peakMemory.Value()
}

// update the unReclaimable resources
cpu := p.unReclaimable[v1.ResourceCPU]
unReclaimableCPU := resource.NewMilliQuantity(unReclaimableCPUMilli, resource.DecimalSI)
cpu.Add(*unReclaimableCPU)
p.unReclaimable[v1.ResourceCPU] = cpu

memory := p.unReclaimable[v1.ResourceMemory]
unReclaimableMemory := resource.NewQuantity(unReclaimableMemoryBytes, resource.BinarySI)
memory.Add(*unReclaimableMemory)
p.unReclaimable[v1.ResourceMemory] = memory

// update the reclaimableCPUMilli resources
if reclaimableCPUMilli > 0 {
cpu := p.reclaimable[v1.ResourceCPU]
reclaimableCPU := resource.NewMilliQuantity(reclaimableCPUMilli, resource.DecimalSI)
Expand All @@ -210,10 +235,21 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error {
}

// GetResult returns the predicted resource list for the added pods.
// The result is the sum of the reclaimable resources of the added pods.
func (p *podReclaimablePredictor) GetResult() (v1.ResourceList, error) {
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(p.reclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(p.reclaimable.Memory().Value()))
return p.reclaimable, nil
// if failed to get node info, stop the reclaimPredictor
if p.node == nil {
return nil, fmt.Errorf("failed to get podReclaimablePredictor result for node is nil")
}
nodeAllocatable, err := getNodeAllocatable(p.node)
if err != nil {
return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err)
}
fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, p.unReclaimable)
fixReclaimable = util.MinResourceList(fixReclaimable, p.reclaimable)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value()))
return fixReclaimable, nil
}

var _ Predictor = (*priorityReclaimablePredictor)(nil)
Expand All @@ -223,20 +259,21 @@ var _ Predictor = (*priorityReclaimablePredictor)(nil)
// Prod-tier and the system components parts.
type priorityReclaimablePredictor struct {
predictServer PredictServer
node *v1.Node
safetyMarginPercent int
priorityClassFilterFn func(p extension.PriorityClass) bool // return true if the priority class is reclaimable

reclaimRequest v1.ResourceList
}

// GetPredictorName is used to obtain the predictor name.
func (n *priorityReclaimablePredictor) GetPredictorName() string {
func (p *priorityReclaimablePredictor) GetPredictorName() string {
return "priorityReclaimablePredictor"
}

func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
func (p *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
priorityClass := extension.GetPodPriorityClassWithDefault(pod)
if !n.priorityClassFilterFn(priorityClass) {
if !p.priorityClassFilterFn(priorityClass) {
klog.V(6).Infof("priorityReclaimablePredictor skip pod %s whose priority %s is not reclaimable",
pod.UID, priorityClass)
return nil
Expand All @@ -250,31 +287,39 @@ func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
}

podRequests := util.GetPodRequest(pod, v1.ResourceCPU, v1.ResourceMemory)
n.reclaimRequest = quotav1.Add(n.reclaimRequest, podRequests)
p.reclaimRequest = quotav1.Add(p.reclaimRequest, podRequests)

return nil
}

func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
func (p *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
// if failed to get node info, stop the reclaimPredictor
if p.node == nil {
return nil, fmt.Errorf("failed to get priorityReclaimablePredictor result for node is nil")
}
nodeAllocatable, err := getNodeAllocatable(p.node)
if err != nil {
return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err)
}
// get sys prediction
sysResult, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)})
sysResult, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)})
if err != nil {
return nil, fmt.Errorf("failed to get prediction of sys, err: %w", err)
}
sysResultForCPU := sysResult.Data["p95"]
sysResultForMemory := sysResult.Data["p98"]
reclaimPredict := v1.ResourceList{
unReclaimable := v1.ResourceList{
v1.ResourceCPU: *sysResultForCPU.Cpu(),
v1.ResourceMemory: *sysResultForMemory.Memory(),
}

// get reclaimable priority class prediction
for _, priorityClass := range extension.KnownPriorityClasses {
if !n.priorityClassFilterFn(priorityClass) {
if !p.priorityClassFilterFn(priorityClass) {
continue
}

result, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))})
result, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))})
if err != nil {
return nil, fmt.Errorf("failed to get prediction of priority %s, err: %s", priorityClass, err)
}
Expand All @@ -285,21 +330,24 @@ func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
v1.ResourceCPU: *resultForCPU.Cpu(),
v1.ResourceMemory: *resultForMemory.Memory(),
}
reclaimPredict = quotav1.Add(reclaimPredict, predictResource)
unReclaimable = quotav1.Add(unReclaimable, predictResource)
}

// scale with the safety margin
ratioAfterSafetyMargin := float64(100+n.safetyMarginPercent) / 100
reclaimPredict = v1.ResourceList{
v1.ResourceCPU: util.MultiplyMilliQuant(*reclaimPredict.Cpu(), ratioAfterSafetyMargin),
v1.ResourceMemory: util.MultiplyQuant(*reclaimPredict.Memory(), ratioAfterSafetyMargin),
ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100
unReclaimable = v1.ResourceList{
v1.ResourceCPU: util.MultiplyMilliQuant(*unReclaimable.Cpu(), ratioAfterSafetyMargin),
v1.ResourceMemory: util.MultiplyQuant(*unReclaimable.Memory(), ratioAfterSafetyMargin),
}

// reclaimable[P] := max(request[P] - peak[P], 0)
reclaimable := quotav1.Max(quotav1.Subtract(n.reclaimRequest, reclaimPredict), util.NewZeroResourceList())
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, n.GetPredictorName(), float64(reclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, n.GetPredictorName(), float64(reclaimable.Memory().Value()))
return reclaimable, nil
reclaimable := quotav1.Max(quotav1.Subtract(p.reclaimRequest, unReclaimable), util.NewZeroResourceList())
// fixReclaimable[P] := min(nodeAllocatable[P]-unReclaimable[P],reclaimable[P])
fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, unReclaimable)
fixReclaimable = util.MinResourceList(fixReclaimable, reclaimable)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value()))
return fixReclaimable, nil
}

var _ Predictor = (*minPredictor)(nil)
Expand Down Expand Up @@ -356,3 +404,15 @@ func isPodReclaimableForProd(pod *v1.Pod) bool {
func isPriorityClassReclaimableForProd(priorityClass extension.PriorityClass) bool {
return priorityClass == extension.PriorityProd || priorityClass == extension.PriorityNone
}

func getNodeAllocatable(node *v1.Node) (v1.ResourceList, error) {
res, err := extension.GetNodeRawAllocatable(node.Annotations)
if err == nil && res != nil {
return res, nil
}
if node.Status.Allocatable != nil {
return node.Status.Allocatable, nil
} else {
return nil, fmt.Errorf("invalid node, for node.status.allocation is nil")
}
}
Loading

0 comments on commit 6cfc773

Please sign in to comment.