Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binpacking can exit without packing all the pods #4970

Merged
merged 3 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,11 @@ type AutoscalingOptions struct {
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.
// Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit
// scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop.
MaxNodesPerScaleUp int
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.ExpanderStrategy = expanderStrategy
}
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName)
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG

if len(option.Pods) > 0 {
estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
option.NodeCount = estimator.Estimate(option.Pods, nodeInfo)
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
}

return option, nil
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func NewScaleTestAutoscalingContext(
}
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName)
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0))
predicateChecker, err := simulator.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err
Expand Down
75 changes: 56 additions & 19 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
Expand All @@ -38,15 +39,18 @@ type podInfo struct {
type BinpackingNodeEstimator struct {
predicateChecker simulator.PredicateChecker
clusterSnapshot simulator.ClusterSnapshot
limiter EstimationLimiter
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(
predicateChecker simulator.PredicateChecker,
clusterSnapshot simulator.ClusterSnapshot) *BinpackingNodeEstimator {
clusterSnapshot simulator.ClusterSnapshot,
limiter EstimationLimiter) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{
predicateChecker: predicateChecker,
clusterSnapshot: clusterSnapshot,
limiter: limiter,
}
}

Expand All @@ -57,69 +61,102 @@ func NewBinpackingNodeEstimator(
// still be maintained.
// It is assumed that all pods from the given list can fit to nodeTemplate.
// Returns the number of nodes needed to accommodate all pods from the list.
func (estimator *BinpackingNodeEstimator) Estimate(
func (e *BinpackingNodeEstimator) Estimate(
pods []*apiv1.Pod,
nodeTemplate *schedulerframework.NodeInfo) int {
nodeTemplate *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) {

e.limiter.StartEstimation(pods, nodeGroup)
defer e.limiter.EndEstimation()

podInfos := calculatePodScore(pods, nodeTemplate)
sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })

newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)

if err := estimator.clusterSnapshot.Fork(); err != nil {
if err := e.clusterSnapshot.Fork(); err != nil {
klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err)
return 0
return 0, nil
}
defer func() {
if err := estimator.clusterSnapshot.Revert(); err != nil {
if err := e.clusterSnapshot.Revert(); err != nil {
klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err)
}
}()

newNodeNameIndex := 0
scheduledPods := []*apiv1.Pod{}
lastNodeName := ""

for _, podInfo := range podInfos {
found := false

nodeName, err := estimator.predicateChecker.FitsAnyNodeMatching(estimator.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return newNodeNames[nodeInfo.Node().Name]
})
if err == nil {
found = true
if err := estimator.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil {
if err := e.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, nodeName, err)
return 0
return 0, nil
}
scheduledPods = append(scheduledPods, podInfo.pod)
newNodesWithPods[nodeName] = true
}

if !found {
// Stop binpacking if we reach the limit of nodes we can add.
// We return the result of the binpacking that we already performed.
if !e.limiter.PermissionToAddNode() {
break
}

// If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Limiting binpacking size/time and checking if any pod was scheduled to a new node are separate optimizations, could've been separate PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this optimization is necessary for fixing #4129 without affecting performance. Previously we'd never have empty nodes in binpacking, so that wasn't a concern.

Fair point on scalability improvements and fix for #4129 should have been a separate PRs. They're separate commits and if you want I can split them into separate PRs easily. But I'm guessing you mean it more as a feedback for future PRs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't actually notice they are separate commits, that'd make my reviewing easier... 🤦 Given low number of my comments I don't think splitting this PR into two is worth the effort, so I guess that's feedback for future PRs.

// on a new node either. There is no point adding more nodes to snapshot in such case, especially because of
// performance cost each extra node adds to future FitsAnyNodeMatching calls.
if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
continue
}

// Add new node
newNodeName, err := estimator.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
if err != nil {
klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err)
return 0
return 0, nil
}
newNodeNameIndex++
// And schedule pod to it
if err := estimator.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
newNodeNames[newNodeName] = true
lastNodeName = newNodeName

// And try to schedule pod to it.
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
return 0
return 0, nil
}
newNodeNames[newNodeName] = true
newNodesWithPods[newNodeName] = true
scheduledPods = append(scheduledPods, podInfo.pod)
}
}
return len(newNodeNames)
return len(newNodesWithPods), scheduledPods
}

func (estimator *BinpackingNodeEstimator) addNewNodeToSnapshot(
func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
template *schedulerframework.NodeInfo,
nameIndex int) (string, error) {

newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("estimator-%d", nameIndex))
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex))
var pods []*apiv1.Pod
for _, podInfo := range newNodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
if err := estimator.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
return "", err
}
return newNodeInfo.Node().Name, nil
Expand Down
Loading