Skip to content

Commit

Permalink
Merge pull request #32 from airbnb/bruno_batista--cluster-autoscaler-…
Browse files Browse the repository at this point in the history
…patch-cluster-autoscaler-1.22.14-airbnb0

cluster autoscaler 1.22.14 release
  • Loading branch information
bcostabatista authored Nov 8, 2022
2 parents c147ba9 + 136261d commit fad33fb
Show file tree
Hide file tree
Showing 26 changed files with 813 additions and 184 deletions.
39 changes: 35 additions & 4 deletions cluster-autoscaler/cloudprovider/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,41 @@ discover](#auto-discovery-setup) EC2 Auto Scaling Groups **(recommended)**, add
Launch Configuration) and/or `ec2:DescribeLaunchTemplateVersions` (if you
created your ASG using a Launch Template) to the `Action` list.

If you prefer, you can restrict the target resources for the autoscaling actions
by specifying Auto Scaling Group ARNs in the `Resource` list of the policy. More
information can be found
[here](https://docs.aws.amazon.com/autoscaling/latest/userguide/control-access-using-iam.html#policy-auto-scaling-resources).
*NOTE:* The below policies/arguments to the Cluster Autoscaler need to be modified as appropriate
for the names of your ASGs, as well as account ID and AWS region before being used.

The following policy provides the minimum privileges necessary for Cluster Autoscaler to run.
When using this policy, you cannot use autodiscovery of ASGs. In addition, it restricts the
IAM permissions to the node groups the Cluster Autoscaler is configured to scale.

This in turn means that you must pass the following arguments to the Cluster Autoscaler
binary, replacing min and max node counts and the ASG:

```bash
--aws-use-static-instance-list=false
--nodes=1:100:exampleASG1
--nodes=1:100:exampleASG2
```

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:DescribeAutoScalingInstances",
"autoscaling:DescribeLaunchConfigurations",
"autoscaling:DescribeScalingActivities",
"autoscaling:SetDesiredCapacity",
"autoscaling:TerminateInstanceInAutoScalingGroup"
],
"Resource": ["arn:aws:autoscaling:${YOUR_CLUSTER_AWS_REGION}:${YOUR_AWS_ACCOUNT_ID}:autoScalingGroup:*:autoScalingGroupName/${YOUR_ASG_NAME}"]
}
]
}
```

### Using OIDC Federated Authentication

Expand Down
13 changes: 7 additions & 6 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type asg struct {
minSize int
maxSize int
curSize int
lastUpdateTime *time.Time
lastUpdateTime time.Time

AvailabilityZones []string
LaunchConfigurationName string
Expand Down Expand Up @@ -254,7 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
}

// Proactively set the ASG size so autoscaler makes better decisions
asg.lastUpdateTime = &start
asg.lastUpdateTime = start
asg.curSize = size

return nil
Expand Down Expand Up @@ -479,7 +479,6 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut
func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: group.AutoScalingGroupName,
MaxRecords: aws.Int64(1), // We only care about the most recent event
}

start := time.Now()
Expand All @@ -489,12 +488,14 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
return true, err // If we can't describe the scaling activities we assume the node group is available
}

if len(response.Activities) > 0 {
activity := response.Activities[0]
for _, activity := range response.Activities {
asgRef := AwsRef{Name: *group.AutoScalingGroupName}
if a, ok := m.registeredAsgs[asgRef]; ok {
lut := a.lastUpdateTime
if lut != nil && activity.StartTime.After(*lut) && *activity.StatusCode == "Failed" {
if activity.StartTime.Before(lut) {
break
} else if *activity.StatusCode == "Failed" {
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
return false, nil
}
} else {
Expand Down
11 changes: 5 additions & 6 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestCreatePlaceholders(t *testing.T) {
name string
desiredCapacity *int64
activities []*autoscaling.Activity
groupLastUpdateTime *time.Time
groupLastUpdateTime time.Time
describeErr error
asgToCheck *string
}{
Expand All @@ -85,7 +85,7 @@ func TestCreatePlaceholders(t *testing.T) {
StartTime: aws.Time(time.Unix(10, 0)),
},
},
groupLastUpdateTime: aws.Time(time.Unix(9, 0)),
groupLastUpdateTime: time.Unix(9, 0),
},
{
name: "AWS scaling failed event before CA scale_up",
Expand All @@ -96,7 +96,7 @@ func TestCreatePlaceholders(t *testing.T) {
StartTime: aws.Time(time.Unix(9, 0)),
},
},
groupLastUpdateTime: aws.Time(time.Unix(10, 0)),
groupLastUpdateTime: time.Unix(10, 0),
},
{
name: "asg not registered",
Expand All @@ -107,7 +107,7 @@ func TestCreatePlaceholders(t *testing.T) {
StartTime: aws.Time(time.Unix(10, 0)),
},
},
groupLastUpdateTime: aws.Time(time.Unix(9, 0)),
groupLastUpdateTime: time.Unix(9, 0),
asgToCheck: aws.String("unregisteredAsgName"),
},
}
Expand All @@ -128,7 +128,6 @@ func TestCreatePlaceholders(t *testing.T) {
if shouldCallDescribeScalingActivities {
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: asgName,
MaxRecords: aws.Int64(1),
}).Return(
&autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities},
tc.describeErr,
Expand Down Expand Up @@ -158,7 +157,7 @@ func TestCreatePlaceholders(t *testing.T) {
}
asgCache.createPlaceholdersForDesiredNonStartedInstances(groups)
assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity)
if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(*tc.groupLastUpdateTime) && asgName == registeredAsgName {
if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName {
assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus)
} else if len(groups[0].Instances) > 0 {
assert.Equal(t, *groups[0].Instances[0].HealthStatus, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

Expand Down
2 changes: 0 additions & 2 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ func TestFetchExplicitAsgs(t *testing.T) {
a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("coolasg"),
MaxRecords: aws.Int64(1),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

Expand Down Expand Up @@ -559,7 +558,6 @@ func TestFetchAutoAsgs(t *testing.T) {
a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("coolasg"),
MaxRecords: aws.Int64(1),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

Expand Down
21 changes: 21 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,25 @@ type AutoscalingOptions struct {
DaemonSetEvictionForOccupiedNodes bool
// User agent to use for HTTP calls.
UserAgent string
// InitialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start
InitialNodeGroupBackoffDuration time.Duration
// MaxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.
MaxNodeGroupBackoffDuration time.Duration
// NodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.
NodeGroupBackoffResetTimeout time.Duration
// MaxScaleDownParallelism is the maximum number of nodes (both empty and needing drain) that can be deleted in parallel.
MaxScaleDownParallelism int
// MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel.
MaxDrainParallelism int
// GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.
// Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit
// scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop.
MaxNodesPerScaleUp int
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.ExpanderStrategy = expanderStrategy
}
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName)
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func NewScaleTestAutoscalingContext(
}
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName)
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0))
predicateChecker, err := simulator.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG

if len(option.Pods) > 0 {
estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
option.NodeCount = estimator.Estimate(option.Pods, nodeInfo)
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
}

return option, nil
Expand Down
14 changes: 11 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return nil
}

if a.deleteCreatedNodesWithErrors() {
danglingNodes, err := a.deleteCreatedNodesWithErrors()
if err != nil {
klog.Warningf("Failed to remove nodes that were created with errors, skipping iteration: %v", err)
return nil
}
if danglingNodes {
klog.V(0).Infof("Some nodes that failed to create were removed, skipping iteration")
return nil
}
Expand Down Expand Up @@ -643,7 +648,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
return removedAny, nil
}

func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool {
func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() (bool, error) {
// We always schedule deleting of incoming errornous nodes
// TODO[lukaszos] Consider adding logic to not retry delete every loop iteration
nodes := a.clusterStateRegistry.GetCreatedNodesWithErrors()
Expand All @@ -661,6 +666,9 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool {
klog.Warningf("Cannot determine nodeGroup for node %v; %v", id, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return false, fmt.Errorf("node %s has no known nodegroup", node.GetName())
}
nodesToBeDeletedByNodeGroupId[nodeGroup.Id()] = append(nodesToBeDeletedByNodeGroupId[nodeGroup.Id()], node)
}

Expand All @@ -685,7 +693,7 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool {
a.clusterStateRegistry.InvalidateNodeInstancesCacheEntry(nodeGroup)
}

return deletedAny
return deletedAny, nil
}

func (a *StaticAutoscaler) nodeGroupsById() map[string]cloudprovider.NodeGroup {
Expand Down
50 changes: 46 additions & 4 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) {

// setup
provider := &mockprovider.CloudProvider{}
Expand Down Expand Up @@ -1058,7 +1058,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
clusterState.UpdateNodes([]*apiv1.Node{}, nil, now)

// delete nodes with create errors
assert.True(t, autoscaler.deleteCreatedNodesWithErrors())
removedNodes, err := autoscaler.deleteCreatedNodesWithErrors()
assert.True(t, removedNodes)
assert.NoError(t, err)

// check delete was called on correct nodes
nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy(
Expand All @@ -1082,7 +1084,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
clusterState.UpdateNodes([]*apiv1.Node{}, nil, now)

// delete nodes with create errors
assert.True(t, autoscaler.deleteCreatedNodesWithErrors())
removedNodes, err = autoscaler.deleteCreatedNodesWithErrors()
assert.True(t, removedNodes)
assert.NoError(t, err)

// nodes should be deleted again
nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy(
Expand Down Expand Up @@ -1145,10 +1149,48 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) {
clusterState.UpdateNodes([]*apiv1.Node{}, nil, now)

// delete nodes with create errors
assert.False(t, autoscaler.deleteCreatedNodesWithErrors())
removedNodes, err = autoscaler.deleteCreatedNodesWithErrors()
assert.False(t, removedNodes)
assert.NoError(t, err)

// we expect no more Delete Nodes
nodeGroupA.AssertNumberOfCalls(t, "DeleteNodes", 2)

// failed node not included by NodeGroupForNode
nodeGroupC := &mockprovider.NodeGroup{}
nodeGroupC.On("Exist").Return(true)
nodeGroupC.On("Autoprovisioned").Return(false)
nodeGroupC.On("TargetSize").Return(1, nil)
nodeGroupC.On("Id").Return("C")
nodeGroupC.On("DeleteNodes", mock.Anything).Return(nil)
nodeGroupC.On("Nodes").Return([]cloudprovider.Instance{
{
Id: "C1",
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "QUOTA",
},
},
},
}, nil)
provider = &mockprovider.CloudProvider{}
provider.On("NodeGroups").Return([]cloudprovider.NodeGroup{nodeGroupC})
provider.On("NodeGroupForNode", mock.Anything).Return(nil, nil)

clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff())
clusterState.RefreshCloudProviderNodeInstancesCache()
autoscaler.clusterStateRegistry = clusterState

// update cluster state
clusterState.UpdateNodes([]*apiv1.Node{}, nil, time.Now())

// return early on failed nodes without matching nodegroups
removedNodes, err = autoscaler.deleteCreatedNodesWithErrors()
assert.False(t, removedNodes)
assert.Error(t, err)
nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0)
}

func TestStaticAutoscalerProcessorCallbacks(t *testing.T) {
Expand Down
Loading

0 comments on commit fad33fb

Please sign in to comment.