Skip to content

Commit

Permalink
CA: stop passing registered upcoming nodes as scale-down candidates
Browse files Browse the repository at this point in the history
Without this, with aggressive settings, scale-down could be removing
registered upcoming nodes before they have a chance to become ready
(the duration of which should be unrelated to the scale-down settings).
  • Loading branch information
towca committed Feb 9, 2023
1 parent 6978ff8 commit d81a5c5
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 30 deletions.
15 changes: 10 additions & 5 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,14 @@ func (csr *ClusterStateRegistry) GetIncorrectNodeGroupSize(nodeGroupName string)
}

// GetUpcomingNodes returns how many new nodes will be added shortly to the node groups or should become ready soon.
// The function may overestimate the number of nodes.
func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
// The function may overestimate the number of nodes. The second return value contains the names of upcoming nodes
// that are already registered in the cluster.
func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]int, registeredNodeNames map[string][]string) {
csr.Lock()
defer csr.Unlock()

result := make(map[string]int)
upcomingCounts = map[string]int{}
registeredNodeNames = map[string][]string{}
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
id := nodeGroup.Id()
readiness := csr.perNodeGroupReadiness[id]
Expand All @@ -932,9 +934,12 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
// Negative value is unlikely but theoretically possible.
continue
}
result[id] = newNodes
upcomingCounts[id] = newNodes
// newNodes should be roughly equal to readiness.NotStarted + readiness.Unregistered. NotStarted are nodes that are
// already registered in the cluster but not yet ready, Unregistered are nodes that aren't registered in the cluster yet.
registeredNodeNames[id] = readiness.NotStarted
}
return result
return upcomingCounts, registeredNodeNames
}

// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
Expand Down
19 changes: 14 additions & 5 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ func TestUnreadyLongAfterCreation(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready))
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
upcoming := clusterstate.GetUpcomingNodes()
upcoming, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, upcoming["ng1"])
assert.Empty(t, upcomingRegistered["ng1"])
}

func TestNotStarted(t *testing.T) {
Expand Down Expand Up @@ -524,12 +525,17 @@ func TestUpcomingNodes(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 6, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
assert.Equal(t, 1, upcomingNodes["ng2"])
assert.Empty(t, upcomingRegistered["ng2"]) // Only unregistered.
assert.Equal(t, 2, upcomingNodes["ng3"])
assert.Equal(t, []string{"ng3-1"}, upcomingRegistered["ng3"]) // 1 registered, 1 unregistered.
assert.NotContains(t, upcomingNodes, "ng4")
assert.NotContains(t, upcomingRegistered, "ng4")
assert.Equal(t, 0, upcomingNodes["ng5"])
assert.Empty(t, upcomingRegistered["ng5"])
}

func TestTaintBasedNodeDeletion(t *testing.T) {
Expand Down Expand Up @@ -566,8 +572,9 @@ func TestTaintBasedNodeDeletion(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
}

func TestIncorrectSize(t *testing.T) {
Expand Down Expand Up @@ -624,17 +631,19 @@ func TestUnregisteredNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Unregistered only.

// The node didn't come up in MaxNodeProvisionTime, it should no longer be
// counted as upcoming (but it is still an unregistered node)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes = clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered = clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, len(upcomingNodes))
assert.Empty(t, upcomingRegistered["ng1"])

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)

upcomingCounts, _ := clusterStateRegistry.GetUpcomingNodes()
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(
Expand Down
54 changes: 43 additions & 11 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}
}

// add upcoming nodes to ClusterSnapshot
upcomingNodes := getUpcomingNodeInfos(a.clusterStateRegistry, nodeInfosForGroups)
for _, upcomingNode := range upcomingNodes {
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
// them and not trigger another scale-up.
// The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which
// doesn't make sense as they're not real).
for _, upcomingNode := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
var pods []*apiv1.Pod
for _, podInfo := range upcomingNode.Pods {
pods = append(pods, podInfo.Pod)
Expand All @@ -467,6 +471,22 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.InternalError, err)
}
}
// Some upcoming nodes can already be registered in the cluster, but not yet ready - we still inject replacements for them above. The actual registered nodes
// have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with aggressive scale-down settings, we
// could be removing the nodes before they have a chance to first become ready (the duration of which should be unrelated to the scale-down settings).
var allRegisteredUpcoming []string
for _, ngRegisteredUpcoming := range registeredUpcoming {
allRegisteredUpcoming = append(allRegisteredUpcoming, ngRegisteredUpcoming...)
}
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
return errors.ToAutoscalerError(errors.InternalError, err)
}
}

l, err := a.ClusterSnapshot.NodeInfos().List()
if err != nil {
Expand Down Expand Up @@ -869,9 +889,9 @@ func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime)
}

func getUpcomingNodeInfos(registry *clusterstate.ClusterStateRegistry, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo {
func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo {
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range registry.GetUpcomingNodes() {
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
klog.Warningf("Couldn't find template for node group %s", nodeGroup)
Expand Down Expand Up @@ -921,17 +941,29 @@ func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.Unremovabl
return counts
}

func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
func subtractNodesByName(nodes []*apiv1.Node, namesToRemove []string) []*apiv1.Node {
var c []*apiv1.Node
namesToDrop := make(map[string]bool)
for _, n := range b {
namesToDrop[n.Name] = true
removeSet := make(map[string]bool)
for _, name := range namesToRemove {
removeSet[name] = true
}
for _, n := range a {
if namesToDrop[n.Name] {
for _, n := range nodes {
if removeSet[n.Name] {
continue
}
c = append(c, n)
}
return c
}

func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return subtractNodesByName(a, nodeNames(b))
}

func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
names[i] = node.Name
}
return names
}
175 changes: 167 additions & 8 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -1273,6 +1276,167 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) {
nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0)
}

type candidateTrackingFakePlanner struct {
lastCandidateNodes map[string]bool
}

func (f *candidateTrackingFakePlanner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError {
f.lastCandidateNodes = map[string]bool{}
for _, node := range scaleDownCandidates {
f.lastCandidateNodes[node.Name] = true
}
return nil
}

func (f *candidateTrackingFakePlanner) CleanUpUnneededNodes() {
}

func (f *candidateTrackingFakePlanner) NodesToDelete(currentTime time.Time) (empty, needDrain []*apiv1.Node) {
return nil, nil
}

func (f *candidateTrackingFakePlanner) UnneededNodes() []*apiv1.Node {
return nil
}

func (f *candidateTrackingFakePlanner) UnremovableNodes() []*simulator.UnremovableNode {
return nil
}

func (f *candidateTrackingFakePlanner) NodeUtilizationMap() map[string]utilization.Info {
return nil
}

func assertSnapshotNodeCount(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, wantCount int) {
nodeInfos, err := snapshot.NodeInfos().List()
assert.NoError(t, err)
assert.Len(t, nodeInfos, wantCount)
}

func assertNodesNotInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) {
nodeInfos, err := snapshot.NodeInfos().List()
assert.NoError(t, err)
for _, nodeInfo := range nodeInfos {
assert.NotContains(t, nodeNames, nodeInfo.Node().Name)
}
}

func assertNodesInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) {
nodeInfos, err := snapshot.NodeInfos().List()
assert.NoError(t, err)
snapshotNodeNames := map[string]bool{}
for _, nodeInfo := range nodeInfos {
snapshotNodeNames[nodeInfo.Node().Name] = true
}
for nodeName := range nodeNames {
assert.Contains(t, snapshotNodeNames, nodeName)
}
}

func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
startTime := time.Time{}

// Generate a number of ready and unready nodes created at startTime, spread across multiple node groups.
provider := testprovider.NewTestCloudProvider(nil, nil)
allNodeNames := map[string]bool{}
readyNodeNames := map[string]bool{}
notReadyNodeNames := map[string]bool{}
var allNodes []*apiv1.Node
var readyNodes []*apiv1.Node

readyNodesCount := 4
unreadyNodesCount := 2
nodeGroupCount := 2
for ngNum := 0; ngNum < nodeGroupCount; ngNum++ {
ngName := fmt.Sprintf("ng-%d", ngNum)
provider.AddNodeGroup(ngName, 0, 1000, readyNodesCount+unreadyNodesCount)

for i := 0; i < readyNodesCount; i++ {
node := BuildTestNode(fmt.Sprintf("%s-ready-node-%d", ngName, i), 2000, 1000)
node.CreationTimestamp = metav1.NewTime(startTime)
SetNodeReadyState(node, true, startTime)
provider.AddNode(ngName, node)

allNodes = append(allNodes, node)
allNodeNames[node.Name] = true

readyNodes = append(readyNodes, node)
readyNodeNames[node.Name] = true
}
for i := 0; i < unreadyNodesCount; i++ {
node := BuildTestNode(fmt.Sprintf("%s-unready-node-%d", ngName, i), 2000, 1000)
node.CreationTimestamp = metav1.NewTime(startTime)
SetNodeReadyState(node, false, startTime)
provider.AddNode(ngName, node)

allNodes = append(allNodes, node)
allNodeNames[node.Name] = true

notReadyNodeNames[node.Name] = true
}
}

// Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined).
allNodeLister := kubernetes.NewTestNodeLister(allNodes)
readyNodeLister := kubernetes.NewTestNodeLister(readyNodes)
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil)

// Create context with minimal options that guarantee we reach the tested logic.
// We're only testing the input to UpdateClusterState which should be called whenever scale-down is enabled, other options shouldn't matter.
options := config.AutoscalingOptions{ScaleDownEnabled: true}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
ctx, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil)
assert.NoError(t, err)

// Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic.
csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount}
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff())

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{})
ctx.ScaleDownActuator = actuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
planner := &candidateTrackingFakePlanner{}

autoscaler := &StaticAutoscaler{
AutoscalingContext: &ctx,
clusterStateRegistry: csr,
scaleDownActuator: actuator,
scaleDownPlanner: planner,
processors: NewTestProcessors(&ctx),
processorCallbacks: processorCallbacks,
}

// RunOnce run right when the nodes are created. Ready nodes should be passed as scale-down candidates, unready nodes should be classified as
// NotStarted and not passed as scale-down candidates (or inserted into the cluster snapshot). The fake upcoming nodes also shouldn't be passed,
// but they should be inserted into the snapshot.
err = autoscaler.RunOnce(startTime)
assert.NoError(t, err)
assert.Equal(t, readyNodeNames, planner.lastCandidateNodes)
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames)
assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames)
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes.

// RunOnce run in the last moment when unready nodes are still classified as NotStarted - assertions the same as above.
err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(-time.Second))
assert.NoError(t, err)
assert.Equal(t, readyNodeNames, planner.lastCandidateNodes)
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames)
assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames)
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes.

// RunOnce run in the first moment when unready nodes exceed the startup threshold, stop being classified as NotStarted, and start being classified
// Unready instead. The unready nodes should be passed as scale-down candidates at this point, and inserted into the snapshot. Fake upcoming
// nodes should no longer be inserted.
err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(time.Second))
assert.Equal(t, allNodeNames, planner.lastCandidateNodes)
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, allNodeNames)
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + actual unready nodes.
}

func TestStaticAutoscalerProcessorCallbacks(t *testing.T) {
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
assert.Equal(t, false, processorCallbacks.disableScaleDownForLoop)
Expand Down Expand Up @@ -1426,6 +1590,9 @@ func TestSubtractNodes(t *testing.T) {
for _, tc := range testCases {
got := subtractNodes(tc.a, tc.b)
assert.Equal(t, nodeNames(got), nodeNames(tc.c))

got = subtractNodesByName(tc.a, nodeNames(tc.b))
assert.Equal(t, nodeNames(got), nodeNames(tc.c))
}
}

Expand Down Expand Up @@ -1526,14 +1693,6 @@ func TestFilterOutYoungPods(t *testing.T) {
}
}

func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
names[i] = node.Name
}
return names
}

func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
select {
case <-deleteFinished:
Expand Down

0 comments on commit d81a5c5

Please sign in to comment.