Skip to content

Commit

Permalink
Throttling Retries (#106)
Browse files Browse the repository at this point in the history
* caching and retries

* fix unit tests

* logging

* fixes

* remove caching

* avoid reconciling all spot events

* fixes

* make spot lookback configurable

* Update main.go

* bump version
  • Loading branch information
eytan-avisror authored May 16, 2020
1 parent 63e0767 commit 71e192f
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 112 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [v0.6.2-alpha2] - 2020-5-16

### Fixed

- Add retries/logging on AWS throttling (#106)
- Avoid modifications to desired instances (#108)

## [v0.6.1-alpha2] - 2020-5-13

### Fixed
Expand Down
28 changes: 0 additions & 28 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,6 @@ all: manager
test: generate fmt vet manifests
go test -v ./controllers/... -coverprofile coverage.txt

env-create:
./test-bdd/setup/setup.sh \
--region $(AWS_REGION) \
--vpc-id $(VPC_ID) \
--cluster-name $(EKS_CLUSTER) \
--ami-id $(STABLE_AMI) \
--cluster-subnets $(CLUSTER_SUBNETS) \
--node-subnets $(NODE_SUBNETS) \
--keypair-name $(KEYPAIR_NAME) \
--prefix $(PREFIX) \
--template-path ./docs \
--instancemgr-tag $(INSTANCEMGR_TAG) \
create

env-delete:
./test-bdd/setup/setup.sh \
--region $(AWS_REGION) \
--vpc-id $(VPC_ID) \
--cluster-name $(EKS_CLUSTER) \
--ami-id $(STABLE_AMI) \
--cluster-subnets $(CLUSTER_SUBNETS) \
--node-subnets $(NODE_SUBNETS) \
--keypair-name $(KEYPAIR_NAME) \
--prefix $(PREFIX) \
--template-path ./docs \
--instancemgr-tag $(INSTANCEMGR_TAG) \
delete

bdd:
go test -timeout 60m -v ./test-bdd/ --godog.stop-on-failure

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![codecov](https://codecov.io/gh/keikoproj/instance-manager/branch/master/graph/badge.svg)](https://codecov.io/gh/keikoproj/instance-manager)
[![Go Report Card](https://goreportcard.com/badge/github.com/keikoproj/instance-manager)](https://goreportcard.com/report/github.com/keikoproj/instance-manager)
[![slack](https://img.shields.io/badge/slack-join%20the%20conversation-ff69b4.svg)][SlackUrl]
![version](https://img.shields.io/badge/version-0.6.1-blue.svg?cacheSeconds=2592000)
![version](https://img.shields.io/badge/version-0.6.2-blue.svg?cacheSeconds=2592000)
> Create and manage instance groups with Kubernetes.
instance-manager simplifies the creation of worker nodes from within a Kubernetes cluster, create `InstanceGroup` objects in your cluster and instance-manager will provision the actual machines and bootstrap them to the cluster.
Expand Down
18 changes: 10 additions & 8 deletions controllers/instancegroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ import (
// InstanceGroupReconciler reconciles an InstanceGroup object
type InstanceGroupReconciler struct {
client.Client
SpotRecommendationTime float64
Log logr.Logger
ControllerConfPath string
ControllerTemplatePath string
MaxParallel int
Auth *InstanceGroupAuthenticator
}
Expand Down Expand Up @@ -225,11 +225,17 @@ func (r *InstanceGroupReconciler) spotEventReconciler(obj handler.MapObject) []c
return nil
}

reason, exists, err := unstructured.NestedString(unstructuredObj, "reason")
if !exists || err != nil {
if reason, ok, _ := unstructured.NestedString(unstructuredObj, "reason"); ok {
if reason != kubeprovider.SpotRecommendationReason {
return nil
}
} else {
return nil
}
if reason != kubeprovider.SpotRecommendationReason {

creationTime := obj.Meta.GetCreationTimestamp()
minutesSince := time.Since(creationTime.Time).Minutes()
if minutesSince > r.SpotRecommendationTime {
return nil
}

Expand All @@ -253,10 +259,6 @@ func (r *InstanceGroupReconciler) spotEventReconciler(obj handler.MapObject) []c
instanceGroup.Name = awsprovider.GetTagValueByKey(tags, provisioners.TagInstanceGroupName)
instanceGroup.Namespace = awsprovider.GetTagValueByKey(tags, provisioners.TagInstanceGroupNamespace)
if instanceGroup.Name == "" || instanceGroup.Namespace == "" {
r.Log.Error(err,
"failed to map v1.event to scalinggroup",
"event", obj.Meta.GetName(),
)
return nil
}

Expand Down
85 changes: 50 additions & 35 deletions controllers/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"encoding/base64"
"fmt"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
Expand Down Expand Up @@ -500,41 +502,49 @@ func (w *AwsWorker) DescribeAutoscalingLaunchConfigs() (autoscaling.DescribeLaun
return *out, nil
}

func (w *AwsWorker) GetAutoscalingLaunchConfig(name string) (*autoscaling.DescribeLaunchConfigurationsOutput, error) {
out, err := w.AsgClient.DescribeLaunchConfigurations(&autoscaling.DescribeLaunchConfigurationsInput{
LaunchConfigurationNames: aws.StringSlice([]string{name}),
})
func (w *AwsWorker) GetAutoscalingLaunchConfig(name string) (*autoscaling.LaunchConfiguration, error) {
var lc *autoscaling.LaunchConfiguration
out, err := w.AsgClient.DescribeLaunchConfigurations(&autoscaling.DescribeLaunchConfigurationsInput{})
if err != nil {
return &autoscaling.DescribeLaunchConfigurationsOutput{}, err
return lc, err
}
for _, config := range out.LaunchConfigurations {
n := aws.StringValue(config.LaunchConfigurationName)
if strings.EqualFold(name, n) {
lc = config
}
}
return out, nil
return lc, nil
}

func (w *AwsWorker) GetAutoscalingGroup(name string) (*autoscaling.DescribeAutoScalingGroupsOutput, error) {
out, err := w.AsgClient.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{name}),
})
func (w *AwsWorker) GetAutoscalingGroup(name string) (*autoscaling.Group, error) {
var asg *autoscaling.Group
out, err := w.AsgClient.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{})
if err != nil {
return &autoscaling.DescribeAutoScalingGroupsOutput{}, err
return asg, err
}
for _, group := range out.AutoScalingGroups {
n := aws.StringValue(group.AutoScalingGroupName)
if strings.EqualFold(name, n) {
asg = group
}
}
return out, nil
return asg, nil
}

func GetScalingGroupTagsByName(name string, client autoscalingiface.AutoScalingAPI) ([]*autoscaling.TagDescription, error) {
tags := []*autoscaling.TagDescription{}
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{name}),
}
input := &autoscaling.DescribeAutoScalingGroupsInput{}
out, err := client.DescribeAutoScalingGroups(input)
if err != nil {
return tags, err
}

if len(out.AutoScalingGroups) < 1 {
err := errors.New("could not find scaling group")
return tags, err
for _, asg := range out.AutoScalingGroups {
n := aws.StringValue(asg.AutoScalingGroupName)
if strings.EqualFold(name, n) {
tags = asg.Tags
}
}
tags = out.AutoScalingGroups[0].Tags
return tags, nil
}

Expand Down Expand Up @@ -569,30 +579,35 @@ func GetRegion() (string, error) {

// GetAwsAsgClient returns an ASG client
func GetAwsAsgClient(region string) autoscalingiface.AutoScalingAPI {
var config aws.Config
config.Region = aws.String(region)
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Config: config,
}))
config := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, NewRetryLogger(DefaultRetryer))
sess, err := session.NewSession(config)
if err != nil {
panic(err)
}
return autoscaling.New(sess)
}

// GetAwsEksClient returns an EKS client
func GetAwsEksClient(region string) eksiface.EKSAPI {
var config aws.Config
config.Region = aws.String(region)
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Config: config,
}))
return eks.New(sess)
config := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, NewRetryLogger(DefaultRetryer))
sess, err := session.NewSession(config)
if err != nil {
panic(err)
}
return eks.New(sess, config)
}

// GetAwsIAMClient returns an IAM client
func GetAwsIamClient(region string) iamiface.IAMAPI {
mySession := session.Must(session.NewSession())
return iam.New(mySession, aws.NewConfig().WithRegion(region))
config := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, NewRetryLogger(DefaultRetryer))
sess, err := session.NewSession(config)
if err != nil {
panic(err)
}
return iam.New(sess, config)
}

type ManagedNodeGroupReconcileState struct {
Expand Down
52 changes: 52 additions & 0 deletions controllers/providers/aws/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package aws

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
)

type RetryLogger struct {
client.DefaultRetryer
}

var _ request.Retryer = &RetryLogger{}

var DefaultRetryer = client.DefaultRetryer{
NumMaxRetries: 50,
MinThrottleDelay: time.Second * 5,
MaxThrottleDelay: time.Second * 60,
MinRetryDelay: time.Second * 1,
MaxRetryDelay: time.Second * 5,
}

func NewRetryLogger(retryer client.DefaultRetryer) *RetryLogger {
return &RetryLogger{
retryer,
}
}

func (l RetryLogger) RetryRules(r *request.Request) time.Duration {
var (
duration = l.DefaultRetryer.RetryRules(r)
service = r.ClientInfo.ServiceName
name string
err string
)

if r.Operation != nil {
name = r.Operation.Name
}
method := fmt.Sprintf("%v/%v", service, name)

if r.Error != nil {
err = fmt.Sprintf("%v", r.Error)
} else {
err = fmt.Sprintf("%d %s", r.HTTPResponse.StatusCode, r.HTTPResponse.Status)
}
log.V(1).Info("retryable failure", "error", err, "method", method, "backoff", duration)

return duration
}
8 changes: 3 additions & 5 deletions controllers/provisioners/eks/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ func (ctx *EksInstanceGroupContext) CloudDiscovery() error {
return errors.Wrap(err, "failed to describe autoscaling launch configurations")
}

// there can only be 1 launch config since we're searching by name
if len(targetLaunchConfig.LaunchConfigurations) != 1 {
if targetLaunchConfig == nil {
return nil
}

var lc = targetLaunchConfig.LaunchConfigurations[0]
var lcName = aws.StringValue(lc.LaunchConfigurationName)
var lcName = aws.StringValue(targetLaunchConfig.LaunchConfigurationName)

state.SetLaunchConfiguration(lc)
state.SetLaunchConfiguration(targetLaunchConfig)
state.SetActiveLaunchConfigurationName(lcName)
status.SetActiveLaunchConfigurationName(lcName)
}
Expand Down
15 changes: 7 additions & 8 deletions controllers/provisioners/eks/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (ctx *EksInstanceGroupContext) CreateScalingGroup() error {
}
ctx.Log.Info("created scaling group", "instancegroup", instanceGroup.GetName(), "scalinggroup", asgName)

out, err := ctx.AwsWorker.GetAutoscalingGroup(asgName)
scalingGroup, err := ctx.AwsWorker.GetAutoscalingGroup(asgName)
if err != nil {
return err
}

if len(out.AutoScalingGroups) == 1 {
state.SetScalingGroup(out.AutoScalingGroups[0])
if scalingGroup != nil {
state.SetScalingGroup(scalingGroup)
}

return nil
Expand Down Expand Up @@ -121,17 +121,16 @@ func (ctx *EksInstanceGroupContext) CreateLaunchConfiguration() error {

ctx.Log.Info("created launchconfig", "instancegroup", instanceGroup.GetName(), "launchconfig", lcName)

lcOut, err := ctx.AwsWorker.GetAutoscalingLaunchConfig(lcName)
lc, err := ctx.AwsWorker.GetAutoscalingLaunchConfig(lcName)
if err != nil {
return err
}

if len(lcOut.LaunchConfigurations) == 1 {
createdLaunchConfiguration := lcOut.LaunchConfigurations[0]
name := aws.StringValue(createdLaunchConfiguration.LaunchConfigurationName)
if lc != nil {
name := aws.StringValue(lc.LaunchConfigurationName)
status.SetActiveLaunchConfigurationName(name)
state.SetActiveLaunchConfigurationName(name)
state.SetLaunchConfiguration(createdLaunchConfiguration)
state.SetLaunchConfiguration(lc)
}

return nil
Expand Down
8 changes: 6 additions & 2 deletions controllers/provisioners/eks/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ limitations under the License.
package eks

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/keikoproj/instance-manager/api/v1alpha1"
"github.com/keikoproj/instance-manager/controllers/common"
"github.com/onsi/gomega"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestCreateLaunchConfigurationPositive(t *testing.T) {
},
})

lcName := "some-launch-config"
lcName := fmt.Sprintf("my-cluster-%v-%v-%v", ig.GetNamespace(), ig.GetName(), common.GetTimeString())
mockLaunchConfiguration := &autoscaling.LaunchConfiguration{
LaunchConfigurationName: aws.String(lcName),
}
Expand Down Expand Up @@ -120,10 +122,12 @@ func TestCreateScalingGroupPositive(t *testing.T) {
LaunchConfiguration: mockLaunchConfiguration,
})

asgName := fmt.Sprintf("my-cluster-%v-%v", ig.GetNamespace(), ig.GetName())
mockScalingGroup := &autoscaling.Group{
AutoScalingGroupName: aws.String("some-scaling-group"),
AutoScalingGroupName: aws.String(asgName),
}
asgMock.AutoScalingGroups = []*autoscaling.Group{mockScalingGroup}
asgMock.AutoScalingGroup = mockScalingGroup

err := ctx.Create()
g.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down
Loading

0 comments on commit 71e192f

Please sign in to comment.