Skip to content

Commit

Permalink
handling placeholder instances with no scaling activities error
Browse files Browse the repository at this point in the history
  • Loading branch information
ravisinha0506 committed Jun 17, 2024
1 parent 4a5d281 commit 9a0830e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 232 deletions.
176 changes: 45 additions & 131 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,123 +308,76 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

var isRecentScalingActivitySuccess = false
var err error

placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s",
placeHolderInstancesCount, commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
isRecentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)
asgNames := []string{commonAsg.Name}
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}

if !isRecentScalingActivitySuccess {
asgDetail, err := m.getDescribeAutoScalingGroupResults(commonAsg)

if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}
activeInstancesInAsg := len(asgDetail[0].Instances)
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)

activeInstancesInAsg := len(asgDetail.Instances)
desiredCapacityInAsg := int(*asgDetail.DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d ",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)
// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG

// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG
err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
return nil
if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
return nil
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}

params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}
}
return nil
}

func (m *asgCache) getDescribeAutoScalingGroupResults(commonAsg *asg) (*autoscaling.Group, error) {
asgs := make([]*autoscaling.Group, 0)
commonAsgNames := []string{commonAsg.Name}
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice(commonAsgNames),
MaxRecords: aws.Int64(100),
}
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}

err := m.awsService.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool {
asgs = append(asgs, output.AutoScalingGroups...)
// We return true while we want to be called with the next page of
// results, if any.
return false
})
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)

if err != nil {
klog.Errorf("Failed while performing DescribeAutoScalingGroupsPages: %v", err)
return nil, err
}
// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

if len(asgs) == 0 {
return nil, fmt.Errorf("no ASGs found for %s", commonAsgNames)
}

return asgs[0], nil
return nil
}

// isPlaceholderInstance checks if the given instance is only a placeholder
Expand Down Expand Up @@ -700,45 +653,6 @@ func (m *asgCache) Cleanup() {
close(m.interrupt)
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, nil
}
}

// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {

Expand Down
103 changes: 2 additions & 101 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ limitations under the License.
package aws

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -28,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -569,22 +567,6 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StatusMessage: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

// Look up the current number of instances...
var expectedInstancesCount int64 = 2
a.On("DescribeAutoScalingGroupsPages",
Expand Down Expand Up @@ -620,7 +602,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
Expand Down Expand Up @@ -756,84 +738,3 @@ func TestHasInstance(t *testing.T) {
assert.NoError(t, err)
assert.False(t, present)
}

// write unit test for DeleteInstances function
func TestDeleteInstances_scalingActivityFailure(t *testing.T) {

a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))

asgs := provider.NodeGroups()
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(1),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})
var expectedInstancesCount int64 = 5
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(100),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0c257f8f05fd1c64b", "i-0c257f8f05fd1c64c", "i-0c257f8f05fd1c64d"), false)
// we expect the instance count to be 1 after the call to DeleteNodes
//expectedInstancesCount =
}).Return(nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Failed"),
StatusMessage: aws.String("Launching a new EC2 instance. Status Reason: We currently do not have sufficient p5.48xlarge capacity in zones with support for 'gp2' volumes. Our system will be working on provisioning additional capacity. Launching EC2 instance failed.\t"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(3),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 5, initialSize)

nodes := []*apiv1.Node{}
asgToInstances := provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}]
for _, instance := range asgToInstances {
nodes = append(nodes, &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: instance.ProviderID,
},
})
}

err = asgs[0].DeleteNodes(nodes)
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, newSize)

}

0 comments on commit 9a0830e

Please sign in to comment.