Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the rate of adding new uneeded nodes #5556

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import (
klog "k8s.io/klog/v2"
)

const unneededNodesLimit = 1000

type eligibilityChecker interface {
FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info, []*simulator.UnremovableNode)
}
Expand All @@ -68,6 +66,7 @@ type Planner struct {
rs removalSimulator
actuationInjector *scheduling.HintingSimulator
latestUpdate time.Time
minUpdateInterval time.Duration
eligibilityChecker eligibilityChecker
nodeUtilizationMap map[string]utilization.Info
actuationStatus scaledown.ActuationStatus
Expand All @@ -79,6 +78,10 @@ type Planner struct {
// New creates a new Planner object.
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner {
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
if minUpdateInterval == 0*time.Nanosecond {
minUpdateInterval = 1 * time.Nanosecond
}
return &Planner{
context: context,
unremovableNodes: unremovable.NewNodes(),
Expand All @@ -90,13 +93,18 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
resourceLimitsFinder: resourceLimitsFinder,
cc: newControllerReplicasCalculator(context.ListerRegistry),
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
minUpdateInterval: minUpdateInterval,
}
}

// UpdateClusterState needs to be periodically invoked to provide Planner with
// up-to-date information about the cluster.
// Planner will evaluate scaleDownCandidates in the order provided here.
func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, currentTime time.Time) errors.AutoscalerError {
updateInterval := currentTime.Sub(p.latestUpdate)
if updateInterval < p.minUpdateInterval {
p.minUpdateInterval = updateInterval
}
p.latestUpdate = currentTime
p.actuationStatus = as
// Avoid persisting changes done by the simulation.
Expand Down Expand Up @@ -254,10 +262,14 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
timer := time.NewTimer(p.context.ScaleDownSimulationTimeout)

for i, node := range currentlyUnneededNodeNames {
if timedOut(timer) || len(removableList) >= unneededNodesLimit {
if timedOut(timer) {
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
break
}
if len(removableList) >= p.unneededNodesLimit() {
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList))
break
}
removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker.GetPdbs())
if removable != nil {
_, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule)
Expand All @@ -279,6 +291,48 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
}

// unneededNodesLimit returns the number of nodes after which calculating more
// unneeded nodes is a waste of time. The reasoning behind it is essentially as
// follows.
// If the nodes are being removed instantly, then during each iteration we're
// going to delete up to MaxScaleDownParallelism nodes. Therefore, it doesn't
// really make sense to add more unneeded nodes than that.
// Let N = MaxScaleDownParallelism. When there are no unneeded nodes, we only
// need to find N of them in the first iteration. Once the unneeded time
// accumulates for them, only up to N will get deleted in a single iteration.
// When there are >0 unneeded nodes, we only need to add N more: once the first
// N will be deleted, we'll need another iteration for the next N nodes to get
// deleted.
// Of course, a node may stop being unneeded at any given time. To prevent
// slowdown stemming from having too little unneeded nodes, we're adding an
// extra buffer of N nodes. Note that we don't have to be super precise about
// the buffer size - if it is too small, we'll simply remove less than N nodes
// in one iteration.
// Finally, we know that in practice nodes are not removed instantly,
// especially when they require draining, so incrementing the limit by N every
// loop may in practice lead the limit to increase too much after a number of
// loops. To help with that, we can put another, not incremental upper bound on
// the limit: with max unneded time U and loop interval I, we're going to have
// up to U/I loops before a node is removed. This means that the total number
// of unneeded nodes shouldn't really exceed N*U/I - scale down will not be
// able to keep up with removing them anyway.
func (p *Planner) unneededNodesLimit() int {
n := p.context.AutoscalingOptions.MaxScaleDownParallelism
extraBuffer := n
limit := len(p.unneededNodes.AsList()) + n + extraBuffer
// TODO(x13n): Use moving average instead of min.
loopInterval := int64(p.minUpdateInterval)
u := int64(p.context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime)
if u < loopInterval {
u = loopInterval
}
upperBound := n*int(u/loopInterval) + extraBuffer
if upperBound < limit {
return upperBound
}
return limit
}

// getKnownOwnerRef returns ownerRef that is known by CA and CA knows the logic of how this controller recreates pods.
func getKnownOwnerRef(ownerRefs []metav1.OwnerReference) *metav1.OwnerReference {
for _, ownerRef := range ownerRefs {
Expand Down
116 changes: 115 additions & 1 deletion cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package planner

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -480,7 +481,13 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
provider := testprovider.NewTestCloudProvider(nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 1 * time.Second}, &fake.Clientset{}, registry, provider, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Second,
MaxScaleDownParallelism: 10,
}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := simulator.NodeDeleteOptions{}
Expand All @@ -506,6 +513,113 @@ func TestUpdateClusterState(t *testing.T) {
}
}

func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
testCases := []struct {
name string
previouslyUnneeded int
nodes int
maxParallelism int
maxUnneededTime time.Duration
updateInterval time.Duration
wantUnneeded int
}{
{
name: "no unneeded, default settings",
previouslyUnneeded: 0,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 20,
},
{
name: "some unneeded, default settings",
previouslyUnneeded: 3,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 23,
},
{
name: "max unneeded, default settings",
previouslyUnneeded: 70,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 70,
},
{
name: "too many unneeded, default settings",
previouslyUnneeded: 77,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 70,
},
{
name: "instant kill nodes",
previouslyUnneeded: 0,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 0 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 20,
},
{
name: "quick loops",
previouslyUnneeded: 13,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 1 * time.Second,
wantUnneeded: 33,
},
{
name: "slow loops",
previouslyUnneeded: 13,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 30 * time.Second,
wantUnneeded: 30,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
nodes := make([]*apiv1.Node, tc.nodes)
for i := 0; i < tc.nodes; i++ {
nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10)
}
previouslyUnneeded := make([]simulator.NodeToBeRemoved, tc.previouslyUnneeded)
for i := 0; i < tc.previouslyUnneeded; i++ {
previouslyUnneeded[i] = simulator.NodeToBeRemoved{Node: nodes[i]}
}
provider := testprovider.NewTestCloudProvider(nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: tc.maxUnneededTime,
},
ScaleDownSimulationTimeout: 1 * time.Hour,
MaxScaleDownParallelism: tc.maxParallelism,
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := simulator.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
assert.NoError(t, p.UpdateClusterState(nodes, nodes, &fakeActuationStatus{}, time.Now()))
assert.Equal(t, tc.wantUnneeded, len(p.unneededNodes.AsList()))
})
}
}

func generateReplicaSets(name string, replicas int32) []*appsv1.ReplicaSet {
return []*appsv1.ReplicaSet{
{
Expand Down