Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve launch template diff detection for MachinePools #4194

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 82 additions & 54 deletions exp/controllers/awsmachinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -233,7 +234,50 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP
// that change will not trigger a refresh. Do not start an instance refresh if only userdata changed.
return asgsvc.CanStartASGInstanceRefresh(machinePoolScope)
}

// Find existing ASG
asg, err := r.findASG(machinePoolScope, asgsvc)
if err != nil {
conditions.MarkUnknown(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, expinfrav1.ASGNotFoundReason, err.Error())
return ctrl.Result{}, err
}
if asg == nil {
machinePoolScope.Debug("asg not created yet")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, expinfrav1.ASGNotFoundReason, "Unable to find matching ASG")
canUpdateLaunchTemplate = func() (bool, error) {
cnmcavoy marked this conversation as resolved.
Show resolved Hide resolved
return true, nil
}
}

runPostLaunchTemplateUpdateOperation := func() error {
launchTemplateID := machinePoolScope.GetLaunchTemplateIDStatus()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't all this happen after the skipping instance refresh check? If instance refresh is completely disabled, none of these things should happen shouldn't they? Like, updating the tags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, never mind. Of course, the ASG is using the LaunchTemplate. So we assure that it's up to date. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must reconcile tags after updating the launch template, which is why this logic was moved into the beginning of runPostLaunchTemplateUpdateOperation - because we need to update our record of the tags on the launch template regardless of whether or not we are doing a refresh. The tags are a property of launch templates, not asgs.

I didn't write this logic, only relocated it from elsewhere in the codepath. Previously we only reconciled tags if the asg could be created, which is too late, because an error could have occurred.

asgName := machinePoolScope.Name()
resourceServiceToUpdate := []scope.ResourceServiceToUpdate{
{
ResourceID: &launchTemplateID,
ResourceService: ec2Svc,
},
{
ResourceID: &asgName,
ResourceService: asgsvc,
},
}
// Update the AWSMachinePool annotation immediately after creating a new template
err = ec2Svc.ReconcileTags(machinePoolScope, resourceServiceToUpdate)
if err != nil {
return errors.Wrap(err, "error updating tags")
}

asg, err := r.findASG(machinePoolScope, asgsvc)
if err != nil {
return fmt.Errorf("failed to find asg %q for instance refresh: %w", asgName, err)
}
if asg == nil {
machinePoolScope.Debug("asg not created yet, skipping instance refresh")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, expinfrav1.ASGNotFoundReason, "Unable to find matching ASG")
return nil
}

// skip instance refresh if explicitly disabled
if machinePoolScope.AWSMachinePool.Spec.RefreshPreferences != nil && machinePoolScope.AWSMachinePool.Spec.RefreshPreferences.Disable {
machinePoolScope.Debug("instance refresh disabled, skipping instance refresh")
Expand All @@ -260,20 +304,13 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP
// set the LaunchTemplateReady condition
conditions.MarkTrue(machinePoolScope.AWSMachinePool, expinfrav1.LaunchTemplateReadyCondition)

// Find existing ASG
asg, err := r.findASG(machinePoolScope, asgsvc)
if err != nil {
conditions.MarkUnknown(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, expinfrav1.ASGNotFoundReason, err.Error())
return ctrl.Result{}, err
}

if asg == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I'm a bit confused here. We reconcile the launch template which does check for an ASG, but we only do the if asg doesn't exist create and requeue AFTER we reconciled the launch template which was looking for an ASG. I mean it's obvious that the ASG won't be there, so why not, if the ASG doesn't exists, create it, then requeue and proceed. I'm not sure I get the order of things in here. :D Can you educate me please? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ASG creation depends on the launch template creation, and the ASG instance refresh depends on the launch template being modified. I think these two dependencies are coupled in the implementation and causing confusion.

My understanding is that we can't move the ASG creation before ec2Svc.ReconcileLaunchTemplate is invoked, because we need that to succeed to ensure a launch template exists to use for the ASG.

However, that is also the function that detects if we modified the launch template and runs the post-update callback.

Maybe we should open a separate issue about clarifying these codepaths? I'm hesitant to add more to this PR or else it will never get merged...

// Create new ASG
if err := r.createPool(machinePoolScope, clusterScope); err != nil {
conditions.MarkFalse(machinePoolScope.AWSMachinePool, expinfrav1.ASGReadyCondition, expinfrav1.ASGProvisionFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

if scope.ReplicasExternallyManaged(machinePoolScope.MachinePool) {
Expand All @@ -294,23 +331,6 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP
return ctrl.Result{}, err
}

launchTemplateID := machinePoolScope.GetLaunchTemplateIDStatus()
asgName := machinePoolScope.Name()
resourceServiceToUpdate := []scope.ResourceServiceToUpdate{
{
ResourceID: &launchTemplateID,
ResourceService: ec2Svc,
},
{
ResourceID: &asgName,
ResourceService: asgsvc,
},
}
err = ec2Svc.ReconcileTags(machinePoolScope, resourceServiceToUpdate)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "error updating tags")
}

// Make sure Spec.ProviderID is always set.
machinePoolScope.AWSMachinePool.Spec.ProviderID = asg.ID
providerIDList := make([]string, len(asg.Instances))
Expand Down Expand Up @@ -347,7 +367,7 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi

if asg == nil {
machinePoolScope.Debug("Unable to locate ASG")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, "NoASGFound", "Unable to find matching ASG")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, expinfrav1.ASGNotFoundReason, "Unable to find matching ASG")
} else {
machinePoolScope.SetASGStatus(asg.Status)
switch asg.Status {
Expand All @@ -374,7 +394,7 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi

if launchTemplate == nil {
machinePoolScope.Debug("Unable to locate launch template")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, "NoASGFound", "Unable to find matching ASG")
r.Recorder.Eventf(machinePoolScope.AWSMachinePool, corev1.EventTypeNormal, expinfrav1.ASGNotFoundReason, "Unable to find matching ASG")
controllerutil.RemoveFinalizer(machinePoolScope.AWSMachinePool, expinfrav1.MachinePoolFinalizer)
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -404,9 +424,16 @@ func (r *AWSMachinePoolReconciler) updatePool(machinePoolScope *scope.MachinePoo
"subnets of machinePoolScope", subnetIDs,
"subnets of existing asg", existingASG.Subnets)
less := func(a, b string) bool { return a < b }
subnetChanges := cmp.Diff(subnetIDs, existingASG.Subnets, cmpopts.SortSlices(less)) != ""
subnetDiff := cmp.Diff(subnetIDs, existingASG.Subnets, cmpopts.SortSlices(less))
if subnetDiff != "" {
machinePoolScope.Debug("asg subnet diff detected", "diff", subnetDiff)
}

if asgNeedsUpdates(machinePoolScope, existingASG) || subnetChanges {
asgDiff := diffASG(machinePoolScope, existingASG)
if asgDiff != "" {
machinePoolScope.Debug("asg diff detected", "diff", subnetDiff)
}
if asgDiff != "" || subnetDiff != "" {
machinePoolScope.Info("updating AutoScalingGroup")

if err := asgSvc.UpdateASG(machinePoolScope); err != nil {
Expand Down Expand Up @@ -493,36 +520,37 @@ func (r *AWSMachinePoolReconciler) findASG(machinePoolScope *scope.MachinePoolSc
return asg, nil
}

// asgNeedsUpdates compares incoming AWSMachinePool and compares against existing ASG.
func asgNeedsUpdates(machinePoolScope *scope.MachinePoolScope, existingASG *expinfrav1.AutoScalingGroup) bool {
// diffASG compares incoming AWSMachinePool and compares against existing ASG.
func diffASG(machinePoolScope *scope.MachinePoolScope, existingASG *expinfrav1.AutoScalingGroup) string {
detectedMachinePoolSpec := machinePoolScope.MachinePool.Spec.DeepCopy()

if !scope.ReplicasExternallyManaged(machinePoolScope.MachinePool) {
if machinePoolScope.MachinePool.Spec.Replicas != nil {
if existingASG.DesiredCapacity == nil || *machinePoolScope.MachinePool.Spec.Replicas != *existingASG.DesiredCapacity {
return true
}
} else if existingASG.DesiredCapacity != nil {
return true
detectedMachinePoolSpec.Replicas = existingASG.DesiredCapacity
}
if diff := cmp.Diff(machinePoolScope.MachinePool.Spec, *detectedMachinePoolSpec); diff != "" {
return diff
}

detectedAWSMachinePoolSpec := machinePoolScope.AWSMachinePool.Spec.DeepCopy()
detectedAWSMachinePoolSpec.MaxSize = existingASG.MaxSize
detectedAWSMachinePoolSpec.MinSize = existingASG.MinSize
detectedAWSMachinePoolSpec.CapacityRebalance = existingASG.CapacityRebalance
{
mixedInstancesPolicy := machinePoolScope.AWSMachinePool.Spec.MixedInstancesPolicy
// InstancesDistribution is optional, and the default values come from AWS, so
// they are not set by the AWSMachinePool defaulting webhook. If InstancesDistribution is
// not set, we use the AWS values for the purpose of comparison.
if mixedInstancesPolicy != nil && mixedInstancesPolicy.InstancesDistribution == nil {
mixedInstancesPolicy = machinePoolScope.AWSMachinePool.Spec.MixedInstancesPolicy.DeepCopy()
mixedInstancesPolicy.InstancesDistribution = existingASG.MixedInstancesPolicy.InstancesDistribution
}
}

if machinePoolScope.AWSMachinePool.Spec.MaxSize != existingASG.MaxSize {
return true
}

if machinePoolScope.AWSMachinePool.Spec.MinSize != existingASG.MinSize {
return true
}

if machinePoolScope.AWSMachinePool.Spec.CapacityRebalance != existingASG.CapacityRebalance {
return true
}

if !cmp.Equal(machinePoolScope.AWSMachinePool.Spec.MixedInstancesPolicy, existingASG.MixedInstancesPolicy) {
machinePoolScope.Info("got a mixed diff here", "incoming", machinePoolScope.AWSMachinePool.Spec.MixedInstancesPolicy, "existing", existingASG.MixedInstancesPolicy)
return true
if !cmp.Equal(mixedInstancesPolicy, existingASG.MixedInstancesPolicy) {
detectedAWSMachinePoolSpec.MixedInstancesPolicy = existingASG.MixedInstancesPolicy
}
}

return false
return cmp.Diff(machinePoolScope.AWSMachinePool.Spec, *detectedAWSMachinePoolSpec)
}

// getOwnerMachinePool returns the MachinePool object owning the current resource.
Expand Down
116 changes: 106 additions & 10 deletions exp/controllers/awsmachinepool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -195,8 +196,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
defer teardown(t, g)
getASG(t, g)

ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any())

_, _ = reconciler.reconcileNormal(context.Background(), ms, cs, cs)

g.Expect(ms.AWSMachinePool.Finalizers).To(ContainElement(expinfrav1.MachinePoolFinalizer))
Expand Down Expand Up @@ -251,7 +250,8 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
setProviderID(t, g)

expectedErr := errors.New("no connection available ")
ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedErr)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(nil, expectedErr)

_, err := reconciler.reconcileNormal(context.Background(), ms, cs, cs)
g.Expect(errors.Cause(err)).To(MatchError(expectedErr))
})
Expand Down Expand Up @@ -297,7 +297,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
setSuspendedProcesses(t, g)
ms.AWSMachinePool.Spec.SuspendProcesses.All = true
ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
ec2Svc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(&expinfrav1.AutoScalingGroup{
Name: "name",
}, nil)
Expand Down Expand Up @@ -337,7 +336,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
setSuspendedProcesses(t, g)

ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
ec2Svc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(&expinfrav1.AutoScalingGroup{
Name: "name",
CurrentlySuspendProcesses: []string{"Launch", "process3"},
Expand Down Expand Up @@ -390,7 +388,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
MaxSize: int32(1),
Subnets: []string{"subnet1", "subnet2"}}
ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
ec2Svc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(&asg, nil).AnyTimes()
asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{"subnet2", "subnet1"}, nil).Times(1)
asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(0)
Expand All @@ -408,7 +405,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
MaxSize: int32(1),
Subnets: []string{"subnet1", "subnet2"}}
ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
ec2Svc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(&asg, nil).AnyTimes()
asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{"subnet1"}, nil).Times(1)
asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(1)
Expand All @@ -426,7 +422,6 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
MaxSize: int32(2),
Subnets: []string{}}
ec2Svc.EXPECT().ReconcileLaunchTemplate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
ec2Svc.EXPECT().ReconcileTags(gomock.Any(), gomock.Any()).Return(nil)
asgSvc.EXPECT().GetASGByName(gomock.Any()).Return(&asg, nil).AnyTimes()
asgSvc.EXPECT().SubnetIDs(gomock.Any()).Return([]string{}, nil).Times(1)
asgSvc.EXPECT().UpdateASG(gomock.Any()).Return(nil).Times(1)
Expand Down Expand Up @@ -473,7 +468,7 @@ func TestAWSMachinePoolReconciler(t *testing.T) {
g.Expect(err).To(BeNil())
g.Expect(buf.String()).To(ContainSubstring("Unable to locate ASG"))
g.Expect(ms.AWSMachinePool.Finalizers).To(ConsistOf(metav1.FinalizerDeleteDependents))
g.Eventually(recorder.Events).Should(Receive(ContainSubstring("NoASGFound")))
g.Eventually(recorder.Events).Should(Receive(ContainSubstring(expinfrav1.ASGNotFoundReason)))
})
t.Run("should cause AWSMachinePool to go into NotReady", func(t *testing.T) {
g := NewWithT(t)
Expand Down Expand Up @@ -700,6 +695,106 @@ func TestASGNeedsUpdates(t *testing.T) {
},
want: true,
},
{
name: "MixedInstancesPolicy.InstancesDistribution != asg.MixedInstancesPolicy.InstancesDistribution",
args: args{
machinePoolScope: &scope.MachinePoolScope{
MachinePool: &expclusterv1.MachinePool{
Spec: expclusterv1.MachinePoolSpec{
Replicas: pointer.Int32(1),
},
},
AWSMachinePool: &expinfrav1.AWSMachinePool{
Spec: expinfrav1.AWSMachinePoolSpec{
MaxSize: 2,
MinSize: 0,
CapacityRebalance: true,
MixedInstancesPolicy: &expinfrav1.MixedInstancesPolicy{
InstancesDistribution: &expinfrav1.InstancesDistribution{
OnDemandAllocationStrategy: expinfrav1.OnDemandAllocationStrategyPrioritized,
SpotAllocationStrategy: expinfrav1.SpotAllocationStrategyCapacityOptimized,
OnDemandBaseCapacity: aws.Int64(0),
OnDemandPercentageAboveBaseCapacity: aws.Int64(100),
},
Overrides: []expinfrav1.Overrides{
{
InstanceType: "m6a.32xlarge",
},
},
},
},
},
Logger: *logger.NewLogger(logr.Discard()),
},
existingASG: &expinfrav1.AutoScalingGroup{
DesiredCapacity: pointer.Int32(1),
MaxSize: 2,
MinSize: 0,
CapacityRebalance: true,
MixedInstancesPolicy: &expinfrav1.MixedInstancesPolicy{
InstancesDistribution: &expinfrav1.InstancesDistribution{
OnDemandAllocationStrategy: expinfrav1.OnDemandAllocationStrategyPrioritized,
SpotAllocationStrategy: expinfrav1.SpotAllocationStrategyLowestPrice,
OnDemandBaseCapacity: aws.Int64(0),
OnDemandPercentageAboveBaseCapacity: aws.Int64(100),
},
Overrides: []expinfrav1.Overrides{
{
InstanceType: "m6a.32xlarge",
},
},
},
},
},
want: true,
},
{
name: "MixedInstancesPolicy.InstancesDistribution unset",
args: args{
machinePoolScope: &scope.MachinePoolScope{
MachinePool: &expclusterv1.MachinePool{
Spec: expclusterv1.MachinePoolSpec{
Replicas: pointer.Int32(1),
},
},
AWSMachinePool: &expinfrav1.AWSMachinePool{
Spec: expinfrav1.AWSMachinePoolSpec{
MaxSize: 2,
MinSize: 0,
CapacityRebalance: true,
MixedInstancesPolicy: &expinfrav1.MixedInstancesPolicy{
Overrides: []expinfrav1.Overrides{
{
InstanceType: "m6a.32xlarge",
},
},
},
},
},
Logger: *logger.NewLogger(logr.Discard()),
},
existingASG: &expinfrav1.AutoScalingGroup{
DesiredCapacity: pointer.Int32(1),
MaxSize: 2,
MinSize: 0,
CapacityRebalance: true,
MixedInstancesPolicy: &expinfrav1.MixedInstancesPolicy{
InstancesDistribution: &expinfrav1.InstancesDistribution{
OnDemandAllocationStrategy: expinfrav1.OnDemandAllocationStrategyPrioritized,
SpotAllocationStrategy: expinfrav1.SpotAllocationStrategyLowestPrice,
OnDemandBaseCapacity: aws.Int64(0),
OnDemandPercentageAboveBaseCapacity: aws.Int64(100),
},
Overrides: []expinfrav1.Overrides{
{
InstanceType: "m6a.32xlarge",
},
},
},
},
},
want: false,
},
{
name: "SuspendProcesses != asg.SuspendProcesses",
args: args{
Expand Down Expand Up @@ -826,7 +921,8 @@ func TestASGNeedsUpdates(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
g.Expect(asgNeedsUpdates(tt.args.machinePoolScope, tt.args.existingASG)).To(Equal(tt.want))
diff := diffASG(tt.args.machinePoolScope, tt.args.existingASG)
g.Expect(diff != "").To(Equal(tt.want))
})
}
}
Loading