From bde080e3424143891132a6a77d8319d67147ae12 Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Thu, 3 Nov 2022 13:45:23 -0700 Subject: [PATCH] feat: Add Native Spot Termination Handling (#2546) --- Makefile | 1 + charts/karpenter/templates/clusterrole.yaml | 2 +- charts/karpenter/templates/configmap.yaml | 1 + charts/karpenter/values.yaml | 3 + cmd/controller/main.go | 7 +- go.mod | 4 +- go.sum | 4 +- pkg/apis/config/settings/settings.go | 36 +- pkg/apis/v1alpha1/register.go | 2 + pkg/controllers/controllers.go | 17 +- pkg/controllers/interruption/controller.go | 279 +++++++++++++ .../interruption_benchmark_test.go | 340 +++++++++++++++ .../interruption/messages/noop/model.go | 31 ++ .../messages/rebalancerecommendation/model.go | 39 ++ .../rebalancerecommendation/parser.go | 44 ++ .../messages/scheduledchange/model.go | 59 +++ .../messages/scheduledchange/parser.go | 55 +++ .../messages/spotinterruption/model.go | 40 ++ .../messages/spotinterruption/parser.go | 44 ++ .../messages/statechange/model.go | 40 ++ .../messages/statechange/parser.go | 54 +++ .../interruption/messages/types.go | 58 +++ pkg/controllers/interruption/metrics.go | 71 ++++ pkg/controllers/interruption/parser.go | 93 +++++ pkg/controllers/interruption/suite_test.go | 389 ++++++++++++++++++ pkg/controllers/nodetemplate/controller.go | 108 +++++ pkg/controllers/nodetemplate/finalizer.go | 41 ++ .../nodetemplate/infrastructure.go | 160 +++++++ pkg/controllers/nodetemplate/metrics.go | 49 +++ pkg/controllers/nodetemplate/suite_test.go | 374 +++++++++++++++++ pkg/controllers/providers/eventbridge.go | 264 ++++++++++++ pkg/controllers/providers/sqs.go | 282 +++++++++++++ pkg/errors/errors.go | 38 +- pkg/fake/atomic.go | 32 +- pkg/fake/eventbridgeapi.go | 75 ++++ pkg/fake/sqsapi.go | 94 +++++ pkg/fake/types.go | 76 ++++ pkg/utils/utils.go | 13 - test/go.mod | 18 +- test/go.sum | 42 ++ .../pipeline-trigger-cron.yaml | 2 +- test/pkg/environment/aws/environment.go | 20 +- test/pkg/environment/aws/expectations.go | 43 +- test/pkg/environment/aws/setup.go | 2 +- test/pkg/environment/common/environment.go | 9 +- test/suites/common/setup.yaml | 1 + test/suites/interruption/suite_test.go | 300 ++++++++++++++ .../cloudformation.yaml | 34 ++ .../scripts/step03-iam-cloud-formation.sh | 2 +- .../scripts/step05-controller-iam.sh | 1 + 50 files changed, 3730 insertions(+), 63 deletions(-) create mode 100644 pkg/controllers/interruption/controller.go create mode 100644 pkg/controllers/interruption/interruption_benchmark_test.go create mode 100644 pkg/controllers/interruption/messages/noop/model.go create mode 100644 pkg/controllers/interruption/messages/rebalancerecommendation/model.go create mode 100644 pkg/controllers/interruption/messages/rebalancerecommendation/parser.go create mode 100644 pkg/controllers/interruption/messages/scheduledchange/model.go create mode 100644 pkg/controllers/interruption/messages/scheduledchange/parser.go create mode 100644 pkg/controllers/interruption/messages/spotinterruption/model.go create mode 100644 pkg/controllers/interruption/messages/spotinterruption/parser.go create mode 100644 pkg/controllers/interruption/messages/statechange/model.go create mode 100644 pkg/controllers/interruption/messages/statechange/parser.go create mode 100644 pkg/controllers/interruption/messages/types.go create mode 100644 pkg/controllers/interruption/metrics.go create mode 100644 pkg/controllers/interruption/parser.go create mode 100644 pkg/controllers/interruption/suite_test.go create mode 100644 pkg/controllers/nodetemplate/controller.go create mode 100644 pkg/controllers/nodetemplate/finalizer.go create mode 100644 pkg/controllers/nodetemplate/infrastructure.go create mode 100644 pkg/controllers/nodetemplate/metrics.go create mode 100644 pkg/controllers/nodetemplate/suite_test.go create mode 100644 pkg/controllers/providers/eventbridge.go create mode 100644 pkg/controllers/providers/sqs.go create mode 100644 pkg/fake/eventbridgeapi.go create mode 100644 pkg/fake/sqsapi.go create mode 100644 pkg/fake/types.go create mode 100644 test/suites/interruption/suite_test.go diff --git a/Makefile b/Makefile index 0c7d180480ca..85470c49dc8f 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,7 @@ HELM_OPTS ?= --set serviceAccount.annotations.eks\\.amazonaws\\.com/role-arn=${K --set clusterName=${CLUSTER_NAME} \ --set clusterEndpoint=${CLUSTER_ENDPOINT} \ --set aws.defaultInstanceProfile=KarpenterNodeInstanceProfile-${CLUSTER_NAME} \ + --set aws.enableInterruptionHandling=true \ --create-namespace # CR for local builds of Karpenter diff --git a/charts/karpenter/templates/clusterrole.yaml b/charts/karpenter/templates/clusterrole.yaml index e4dec34ac962..520a00d50360 100644 --- a/charts/karpenter/templates/clusterrole.yaml +++ b/charts/karpenter/templates/clusterrole.yaml @@ -31,7 +31,7 @@ rules: # Read - apiGroups: ["karpenter.k8s.aws"] resources: ["awsnodetemplates"] - verbs: ["get", "list", "watch"] + verbs: ["get", "list", "watch", "patch"] - apiGroups: ["admissionregistration.k8s.io"] resources: ["validatingwebhookconfigurations"] verbs: ["update"] diff --git a/charts/karpenter/templates/configmap.yaml b/charts/karpenter/templates/configmap.yaml index 7e504112c2ca..54a6ff9fa25e 100644 --- a/charts/karpenter/templates/configmap.yaml +++ b/charts/karpenter/templates/configmap.yaml @@ -12,6 +12,7 @@ metadata: data: "batchMaxDuration": "{{ .Values.controller.batchMaxDuration }}" "batchIdleDuration": "{{ .Values.controller.batchIdleDuration }}" + "aws.enableInterruptionHandling": "{{ .Values.aws.enableInterruptionHandling }}" {{- range $tag, $value := .Values.aws.tags }} "aws.tags.{{ $tag }}": "{{ $value }}" {{- end }} diff --git a/charts/karpenter/values.yaml b/charts/karpenter/values.yaml index 77bd0d5d4de6..2957a09dbe65 100644 --- a/charts/karpenter/values.yaml +++ b/charts/karpenter/values.yaml @@ -140,5 +140,8 @@ clusterEndpoint: "" aws: # -- The default instance profile to use when launching nodes on AWS defaultInstanceProfile: "" + # -- enableInterruptionHandling is currently in BETA and is disabled by default. Enabling interruption handling may + # require additional permissions on the controller service account. Additional permissions are outlined in the docs + enableInterruptionHandling: false # -- The global tags to use on all AWS infrastructure resources (launch templates, instances, SQS queue, etc.) tags: \ No newline at end of file diff --git a/cmd/controller/main.go b/cmd/controller/main.go index eeefdd58cc3d..76f498af8c45 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -16,7 +16,6 @@ package main import ( "github.com/samber/lo" - "k8s.io/utils/clock" awscloudprovider "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/context" @@ -46,14 +45,13 @@ func main() { lo.Must0(operator.AddHealthzCheck("cloud-provider", awsCloudProvider.LivenessProbe)) cloudProvider := metrics.Decorate(awsCloudProvider) - clusterState := state.NewCluster(operator.SettingsStore.InjectSettings(ctx), operator.Clock, operator.GetClient(), cloudProvider) operator. WithControllers(ctx, corecontrollers.NewControllers( ctx, - clock.RealClock{}, + operator.Clock, operator.GetClient(), operator.KubernetesInterface, - clusterState, + state.NewCluster(operator.SettingsStore.InjectSettings(ctx), operator.Clock, operator.GetClient(), cloudProvider), operator.EventRecorder, operator.SettingsStore, cloudProvider, @@ -61,7 +59,6 @@ func main() { WithWebhooks(corewebhooks.NewWebhooks()...). WithControllers(ctx, controllers.NewControllers( awsCtx, - clusterState, )...). WithWebhooks(webhooks.NewWebhooks()...). Start(ctx) diff --git a/go.mod b/go.mod index f85e101d4f5a..de62e77dd2a2 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,10 @@ require ( github.com/onsi/gomega v1.24.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml/v2 v2.0.5 + github.com/prometheus/client_golang v1.12.2 github.com/samber/lo v1.33.0 go.uber.org/multierr v1.8.0 + go.uber.org/zap v1.23.0 k8s.io/api v0.25.2 k8s.io/apimachinery v0.25.2 k8s.io/client-go v0.25.2 @@ -70,7 +72,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect @@ -79,7 +80,6 @@ require ( go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/automaxprocs v1.4.0 // indirect - go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.1.0 // indirect diff --git a/go.sum b/go.sum index c02753d14dd7..251ab89a0109 100644 --- a/go.sum +++ b/go.sum @@ -335,8 +335,8 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= -github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU= -github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= +github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34= +github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/pkg/apis/config/settings/settings.go b/pkg/apis/config/settings/settings.go index d8f758076d4a..2d6ead08f7d7 100644 --- a/pkg/apis/config/settings/settings.go +++ b/pkg/apis/config/settings/settings.go @@ -35,27 +35,34 @@ var Registration = &config.Registration{ DefaultData: lo.Must(defaultSettings.Data()), } -var defaultSettings = Settings{} +var defaultSettings = Settings{ + EnableInterruptionHandling: false, + Tags: map[string]string{}, +} type Settings struct { - Tags map[string]string + EnableInterruptionHandling bool `json:"aws.enableInterruptionHandling,string"` + Tags map[string]string `json:"aws.tags,omitempty"` } -func (s Settings) UnmarshalJSON(raw []byte) error { +func (s Settings) MarshalJSON() ([]byte, error) { + type internal Settings d := map[string]string{} - if err := json.Unmarshal(raw, &d); err != nil { - return err + + // Store a value of tags locally, so we can marshal the rest of the struct + tags := s.Tags + s.Tags = nil + + raw, err := json.Marshal(internal(s)) + if err != nil { + return nil, fmt.Errorf("marshaling settings, %w", err) } - if err := AsMap("aws.tags", &s.Tags)(d); err != nil { - return err + if err = json.Unmarshal(raw, &d); err != nil { + return nil, fmt.Errorf("unmarshalling settings into map, %w", err) } - return nil -} - -func (s Settings) MarshalJSON() ([]byte, error) { - d := map[string]string{} - if err := FromMap(s.Tags)("aws.tags", &d); err != nil { - return nil, err + // Rewind the tags from the map into separate values + if err = FromMap(tags)("aws.tags", &d); err != nil { + return nil, fmt.Errorf("rewinding tags into map, %w", err) } return json.Marshal(d) } @@ -74,6 +81,7 @@ func NewSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) { s := defaultSettings if err := configmap.Parse(cm.Data, + configmap.AsBool("aws.enableInterruptionHandling", &s.EnableInterruptionHandling), AsMap("aws.tags", &s.Tags), ); err != nil { // Failing to parse means that there is some error in the Settings, so we should crash diff --git a/pkg/apis/v1alpha1/register.go b/pkg/apis/v1alpha1/register.go index b2e5a6496ab0..f499b6175035 100644 --- a/pkg/apis/v1alpha1/register.go +++ b/pkg/apis/v1alpha1/register.go @@ -72,6 +72,8 @@ var ( LabelInstanceGPUCount = LabelDomain + "/instance-gpu-count" LabelInstanceGPUMemory = LabelDomain + "/instance-gpu-memory" LabelInstanceAMIID = LabelDomain + "/instance-ami-id" + + InterruptionInfrastructureFinalizer = Group + "/interruption-infrastructure" ) var ( diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index bb045c43250c..e8f55e5b7ce9 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -15,11 +15,22 @@ limitations under the License. package controllers import ( - "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/karpenter-core/pkg/operator/controller" awscontext "github.com/aws/karpenter/pkg/context" + "github.com/aws/karpenter/pkg/controllers/interruption" + "github.com/aws/karpenter/pkg/controllers/nodetemplate" + "github.com/aws/karpenter/pkg/controllers/providers" ) -func NewControllers(ctx awscontext.Context, cluster *state.Cluster) []controller.Controller { - return []controller.Controller{} +func NewControllers(ctx awscontext.Context) []controller.Controller { + sqsProvider := providers.NewSQS(sqs.New(ctx.Session)) + eventBridgeProvider := providers.NewEventBridge(eventbridge.New(ctx.Session), sqsProvider) + + return []controller.Controller{ + nodetemplate.NewController(ctx.KubeClient, sqsProvider, eventBridgeProvider), + interruption.NewController(ctx.KubeClient, ctx.Clock, ctx.EventRecorder, sqsProvider, ctx.UnavailableOfferingsCache), + } } diff --git a/pkg/controllers/interruption/controller.go b/pkg/controllers/interruption/controller.go new file mode 100644 index 000000000000..5df4449cb610 --- /dev/null +++ b/pkg/controllers/interruption/controller.go @@ -0,0 +1,279 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package interruption + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + sqsapi "github.com/aws/aws-sdk-go/service/sqs" + "github.com/samber/lo" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" + "knative.dev/pkg/logging" + "knative.dev/pkg/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter/pkg/apis" + "github.com/aws/karpenter/pkg/apis/config/settings" + "github.com/aws/karpenter/pkg/apis/v1alpha1" + "github.com/aws/karpenter/pkg/cache" + interruptionevents "github.com/aws/karpenter/pkg/controllers/interruption/events" + "github.com/aws/karpenter/pkg/controllers/interruption/messages" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/statechange" + "github.com/aws/karpenter/pkg/controllers/providers" + "github.com/aws/karpenter/pkg/errors" + "github.com/aws/karpenter/pkg/utils" + + "github.com/aws/karpenter-core/pkg/events" + "github.com/aws/karpenter-core/pkg/operator/scheme" + + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/metrics" + corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" +) + +func init() { + lo.Must0(apis.AddToScheme(scheme.Scheme)) +} + +type Action string + +const ( + CordonAndDrain Action = "CordonAndDrain" + NoAction Action = "NoAction" +) + +// Controller is an AWS interruption controller. +// It continually polls an SQS queue for events from aws.ec2 and aws.health that +// trigger node health events or node spot interruption/rebalance events. +type Controller struct { + kubeClient client.Client + clk clock.Clock + recorder events.Recorder + sqsProvider *providers.SQS + unavailableOfferingsCache *cache.UnavailableOfferings + parser *EventParser +} + +func NewController(kubeClient client.Client, clk clock.Clock, recorder events.Recorder, + sqsProvider *providers.SQS, unavailableOfferingsCache *cache.UnavailableOfferings) *Controller { + + return &Controller{ + kubeClient: kubeClient, + clk: clk, + recorder: recorder, + sqsProvider: sqsProvider, + unavailableOfferingsCache: unavailableOfferingsCache, + parser: NewEventParser(DefaultParsers...), + } +} + +func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { + if !settings.FromContext(ctx).EnableInterruptionHandling { + return reconcile.Result{RequeueAfter: time.Second * 10}, nil + } + queueExists, err := c.sqsProvider.QueueExists(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("checking queue existence, %w", err) + } + if !queueExists { + return reconcile.Result{RequeueAfter: time.Second * 10}, nil + } + sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err) + } + if len(sqsMessages) == 0 { + return reconcile.Result{}, nil + } + instanceIDMap, err := c.makeInstanceIDMap(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("making instance id map, %w", err) + } + errs := make([]error, len(sqsMessages)) + workqueue.ParallelizeUntil(ctx, 10, len(sqsMessages), func(i int) { + msg, e := c.parseMessage(sqsMessages[i]) + if e != nil { + // If we fail to parse, then we should delete the message but still log the error + logging.FromContext(ctx).Errorf("parsing message, %v", e) + errs[i] = c.deleteMessage(ctx, sqsMessages[i]) + return + } + if e = c.handleMessage(ctx, instanceIDMap, msg); e != nil { + errs[i] = fmt.Errorf("handling message, %w", e) + return + } + errs[i] = c.deleteMessage(ctx, sqsMessages[i]) + }) + return reconcile.Result{}, multierr.Combine(errs...) +} + +func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { + return corecontroller.NewSingletonManagedBy(m). + Named("interruption") +} + +func (c *Controller) LivenessProbe(_ *http.Request) error { + return nil +} + +// parseMessage parses the passed SQS message into an internal Message interface +func (c *Controller) parseMessage(raw *sqsapi.Message) (messages.Message, error) { + // No message to parse in this case + if raw == nil || raw.Body == nil { + return nil, fmt.Errorf("message or message body is nil") + } + msg, err := c.parser.Parse(*raw.Body) + if err != nil { + return nil, fmt.Errorf("parsing sqs message, %w", err) + } + return msg, nil +} + +// handleMessage takes an action against every node involved in the message that is owned by a Provisioner +func (c *Controller) handleMessage(ctx context.Context, instanceIDMap map[string]*v1.Node, msg messages.Message) (err error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("messageKind", msg.Kind())) + receivedMessages.WithLabelValues(string(msg.Kind())).Inc() + + var failedNodeNames []string + for _, instanceID := range msg.EC2InstanceIDs() { + node, ok := instanceIDMap[instanceID] + if !ok { + continue + } + if e := c.handleNode(ctx, msg, node); e != nil { + failedNodeNames = append(failedNodeNames, node.Name) + err = multierr.Append(err, e) + } + } + messageLatency.Observe(time.Since(msg.StartTime()).Seconds()) + if err != nil { + return fmt.Errorf("failed to act on nodes [%s%s], %w", + strings.Join(lo.Slice(failedNodeNames, 0, 3), ","), + lo.Ternary(len(failedNodeNames) > 3, "...", ""), err) + } + return nil +} + +// deleteMessage removes the passed SQS message from the queue and fires a metric for the deletion +func (c *Controller) deleteMessage(ctx context.Context, msg *sqsapi.Message) error { + if err := c.sqsProvider.DeleteSQSMessage(ctx, msg); err != nil { + return fmt.Errorf("deleting sqs message, %w", err) + } + deletedMessages.Inc() + return nil +} + +// handleNode retrieves the action for the message and then performs the appropriate action against the node +func (c *Controller) handleNode(ctx context.Context, msg messages.Message, node *v1.Node) error { + action := actionForMessage(msg) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", node.Name)) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", string(action))) + + // Record metric and event for this action + c.notifyForMessage(msg, node) + actionsPerformed.WithLabelValues(string(action)).Inc() + + // Mark the offering as unavailable in the ICE cache since we got a spot interruption warning + if msg.Kind() == messages.SpotInterruptionKind { + zone := node.Labels[v1.LabelTopologyZone] + instanceType := node.Labels[v1.LabelInstanceTypeStable] + if zone != "" && instanceType != "" { + c.unavailableOfferingsCache.MarkUnavailable(ctx, string(msg.Kind()), instanceType, zone, v1alpha1.CapacityTypeSpot) + } + } + if action != NoAction { + return c.deleteNode(ctx, node) + } + return nil +} + +// deleteNode removes the node from the api-server +func (c *Controller) deleteNode(ctx context.Context, node *v1.Node) error { + if err := c.kubeClient.Delete(ctx, node); err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("deleting the node on interruption message, %w", err) + } + logging.FromContext(ctx).Infof("Deleted node from interruption message") + c.recorder.Publish(interruptionevents.NodeTerminatingOnInterruption(node)) + metrics.NodesTerminatedCounter.WithLabelValues(terminationReasonLabel).Inc() + return nil +} + +// notifyForMessage publishes the relevant alert based on the message kind +func (c *Controller) notifyForMessage(msg messages.Message, n *v1.Node) { + switch msg.Kind() { + case messages.RebalanceRecommendationKind: + c.recorder.Publish(interruptionevents.InstanceRebalanceRecommendation(n)) + + case messages.ScheduledChangeKind: + c.recorder.Publish(interruptionevents.InstanceUnhealthy(n)) + + case messages.SpotInterruptionKind: + c.recorder.Publish(interruptionevents.InstanceSpotInterrupted(n)) + + case messages.StateChangeKind: + typed := msg.(statechange.Message) + if lo.Contains([]string{"stopping", "stopped"}, typed.Detail.State) { + c.recorder.Publish(interruptionevents.InstanceStopping(n)) + } else { + c.recorder.Publish(interruptionevents.InstanceTerminating(n)) + } + + default: + } +} + +// makeInstanceIDMap builds a map between the instance id that is stored in the +// node .spec.providerID and the node name stored on the host +func (c *Controller) makeInstanceIDMap(ctx context.Context) (map[string]*v1.Node, error) { + m := map[string]*v1.Node{} + nodeList := &v1.NodeList{} + if err := c.kubeClient.List(ctx, nodeList); err != nil { + return nil, fmt.Errorf("listing nodes, %w", err) + } + for i := range nodeList.Items { + node := nodeList.Items[i] + // If this node isn't owned by a provisioner, we shouldn't handle it + if _, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; !ok { + continue + } + id, err := utils.ParseInstanceID(&node) + if err != nil || id == nil { + continue + } + m[ptr.StringValue(id)] = &node + } + return m, nil +} + +func actionForMessage(msg messages.Message) Action { + switch msg.Kind() { + case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.StateChangeKind: + return CordonAndDrain + default: + return NoAction + } +} diff --git a/pkg/controllers/interruption/interruption_benchmark_test.go b/pkg/controllers/interruption/interruption_benchmark_test.go new file mode 100644 index 000000000000..1781adfa1f20 --- /dev/null +++ b/pkg/controllers/interruption/interruption_benchmark_test.go @@ -0,0 +1,340 @@ +//go:build test_performance + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//nolint:gosec +package interruption_test + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/avast/retry-go" + "github.com/aws/aws-sdk-go/aws" + awsclient "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/patrickmn/go-cache" + "go.uber.org/multierr" + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + clock "k8s.io/utils/clock/testing" + "knative.dev/pkg/logging" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + awscache "github.com/aws/karpenter/pkg/cache" + awscontext "github.com/aws/karpenter/pkg/context" + "github.com/aws/karpenter/pkg/controllers/interruption" + "github.com/aws/karpenter/pkg/controllers/interruption/events" + "github.com/aws/karpenter/pkg/controllers/nodetemplate" + "github.com/aws/karpenter/pkg/controllers/providers" + awstest "github.com/aws/karpenter/pkg/test" + + "github.com/aws/karpenter-core/pkg/apis/config/settings" + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider/fake" + "github.com/aws/karpenter-core/pkg/operator/injection" + "github.com/aws/karpenter-core/pkg/operator/options" + "github.com/aws/karpenter-core/pkg/test" +) + +var r = rand.New(rand.NewSource(time.Now().Unix())) + +func BenchmarkNotification15000(b *testing.B) { + benchmarkNotificationController(b, 15000) +} + +func BenchmarkNotification5000(b *testing.B) { + benchmarkNotificationController(b, 5000) +} + +func BenchmarkNotification1000(b *testing.B) { + benchmarkNotificationController(b, 1000) +} + +func BenchmarkNotification100(b *testing.B) { + benchmarkNotificationController(b, 100) +} + +//nolint:gocyclo +func benchmarkNotificationController(b *testing.B, messageCount int) { + opts := options.Options{ + AWSIsolatedVPC: true, + ClusterName: "karpenter-notification-benchmarking", + } + fakeClock = &clock.FakeClock{} + settingsStore := test.SettingsStore{ + settings.ContextKey: test.Settings(), + awssettings.ContextKey: awssettings.Settings{ + EnableInterruptionHandling: true, + }, + } + ctx = settingsStore.InjectSettings(context.Background()) + ctx = injection.WithOptions(ctx, opts) + env = test.NewEnvironment(ctx, func(e *test.Environment) {}) + env.CRDDirectoryPaths = append(env.CRDDirectoryPaths, relativeToRoot("charts/karpenter/crds")) + if err := env.Start(); err != nil { + b.Fatalf("Starting envirionment, %v", err) + } + + // Stop the test environment after the test completes + defer func() { + if err := retry.Do(func() error { + return env.Stop() + }); err != nil { + b.Fatalf("stopping test environment, %v", err) + } + }() + + providers := newProviders(env.Ctx) + if err := providers.makeInfrastructure(env.Ctx); err != nil { + b.Fatalf("standing up infrastructure, %v", err) + } + // Cleanup the infrastructure after the test completes + defer func() { + if err := retry.Do(func() error { + return providers.cleanupInfrastructure(env.Ctx) + }); err != nil { + b.Fatalf("deleting infrastructure, %v", err) + } + }() + + // Load all the fundamental components before setting up the controllers + recorder = test.NewEventRecorder() + cloudProvider = &fake.CloudProvider{} + + unavailableOfferingsCache = awscache.NewUnavailableOfferings(cache.New(awscache.UnavailableOfferingsTTL, awscontext.CacheCleanupInterval)) + + // Provision a single AWS Node Template to allow interruption reconciliation + if err := env.Client.Create(ctx, awstest.AWSNodeTemplate()); err != nil { + b.Fatalf("creating AWS node template, %v", err) + } + + // Set-up the controllers + interruptionController := interruption.NewController(env.Client, fakeClock, recorder, providers.sqsProvider, unavailableOfferingsCache) + + messages, nodes := makeDiverseMessagesAndNodes(messageCount) + + logging.FromContext(env.Ctx).Infof("Provisioning %d nodes", messageCount) + if err := provisionNodes(env.Ctx, env.Client, nodes); err != nil { + b.Fatalf("provisioning nodes, %v", err) + } + logging.FromContext(env.Ctx).Infof("Completed provisioning %d nodes", messageCount) + + logging.FromContext(env.Ctx).Infof("Provisioning %d messages into the SQS Queue", messageCount) + if err := providers.provisionMessages(env.Ctx, messages...); err != nil { + b.Fatalf("provisioning messages, %v", err) + } + logging.FromContext(env.Ctx).Infof("Completed provisioning %d messages into the SQS Queue", messageCount) + + m, err := controllerruntime.NewManager(env.Config, controllerruntime.Options{ + BaseContext: func() context.Context { return logging.WithLogger(env.Ctx, zap.NewNop().Sugar()) }, + }) + if err != nil { + b.Fatalf("creating manager, %v", err) + } + + // Registering controller with the manager + if err = interruptionController.Builder(ctx, m).Complete(interruptionController); err != nil { + b.Fatalf("registering interruption controller, %v", err) + } + + b.ResetTimer() + start := time.Now() + managerErr := make(chan error) + go func() { + logging.FromContext(env.Ctx).Infof("Starting controller manager") + managerErr <- m.Start(env.Ctx) + }() + + select { + case <-providers.monitorMessagesProcessed(env.Ctx, recorder, messageCount): + case err = <-managerErr: + b.Fatalf("running manager, %v", err) + } + + duration := time.Since(start) + b.ReportMetric(float64(messageCount), "Messages") + b.ReportMetric(duration.Seconds(), "TotalDurationInSeconds") + b.ReportMetric(float64(messageCount)/duration.Seconds(), "Messages/Second") +} + +type providerSet struct { + kubeClient client.Client + sqsProvider *providers.SQS + eventBridgeProvider *providers.EventBridge +} + +func newProviders(ctx context.Context) providerSet { + sess := session.Must(session.NewSession( + request.WithRetryer( + &aws.Config{STSRegionalEndpoint: endpoints.RegionalSTSEndpoint}, + awsclient.DefaultRetryer{NumMaxRetries: awsclient.DefaultRetryerMaxNumRetries}, + ), + )) + sqsProvider = providers.NewSQS(sqs.New(sess)) + eventBridgeProvider = providers.NewEventBridge(eventbridge.New(sess), sqsProvider) + return providerSet{ + sqsProvider: sqsProvider, + eventBridgeProvider: eventBridgeProvider, + } +} + +func (p *providerSet) makeInfrastructure(ctx context.Context) error { + infraReconciler := nodetemplate.NewInfrastructureReconciler(p.kubeClient, p.sqsProvider, p.eventBridgeProvider) + if err := infraReconciler.CreateInfrastructure(ctx); err != nil { + return fmt.Errorf("creating infrastructure, %w", err) + } + + if err := p.sqsProvider.SetQueueAttributes(ctx, map[string]*string{ + sqs.QueueAttributeNameMessageRetentionPeriod: aws.String("1200"), // 20 minutes for this test + }); err != nil { + return fmt.Errorf("updating message retention period, %w", err) + } + return nil +} + +func (p *providerSet) cleanupInfrastructure(ctx context.Context) error { + infraReconciler := nodetemplate.NewInfrastructureReconciler(p.kubeClient, p.sqsProvider, p.eventBridgeProvider) + if err := infraReconciler.DeleteInfrastructure(ctx); err != nil { + return fmt.Errorf("deleting infrastructure, %w", err) + } + return nil +} + +func (p *providerSet) provisionMessages(ctx context.Context, messages ...interface{}) error { + errs := make([]error, len(messages)) + workqueue.ParallelizeUntil(ctx, 20, len(messages), func(i int) { + _, err := p.sqsProvider.SendMessage(ctx, messages[i]) + errs[i] = err + }) + return multierr.Combine(errs...) +} + +func (p *providerSet) monitorMessagesProcessed(ctx context.Context, eventRecorder *test.EventRecorder, expectedProcessed int) <-chan struct{} { + done := make(chan struct{}) + totalProcessed := 0 + go func() { + for totalProcessed < expectedProcessed { + totalProcessed = eventRecorder.Calls(events.InstanceStopping(test.Node()).Reason) + + eventRecorder.Calls(events.InstanceTerminating(test.Node()).Reason) + + eventRecorder.Calls(events.InstanceUnhealthy(test.Node()).Reason) + + eventRecorder.Calls(events.InstanceRebalanceRecommendation(test.Node()).Reason) + + eventRecorder.Calls(events.InstanceSpotInterrupted(test.Node()).Reason) + logging.FromContext(ctx).Infof("Processed %d messages from the queue", totalProcessed) + time.Sleep(time.Second) + } + close(done) + }() + return done +} + +func provisionNodes(ctx context.Context, kubeClient client.Client, nodes []*v1.Node) error { + errs := make([]error, len(nodes)) + workqueue.ParallelizeUntil(ctx, 20, len(nodes), func(i int) { + if err := retry.Do(func() error { + return kubeClient.Create(ctx, nodes[i]) + }); err != nil { + errs[i] = fmt.Errorf("provisioning node, %w", err) + } + }) + return multierr.Combine(errs...) +} + +func makeDiverseMessagesAndNodes(count int) ([]interface{}, []*v1.Node) { + var messages []interface{} + var nodes []*v1.Node + + newMessages, newNodes := makeScheduledChangeMessagesAndNodes(count / 3) + messages = append(messages, newMessages...) + nodes = append(nodes, newNodes...) + + newMessages, newNodes = makeSpotInterruptionMessagesAndNodes(count / 3) + messages = append(messages, newMessages...) + nodes = append(nodes, newNodes...) + + newMessages, newNodes = makeStateChangeMessagesAndNodes(count-len(messages), []string{ + "stopping", "stopped", "shutting-down", "terminated", + }) + messages = append(messages, newMessages...) + nodes = append(nodes, newNodes...) + + return messages, nodes +} + +func makeScheduledChangeMessagesAndNodes(count int) ([]interface{}, []*v1.Node) { + var msgs []interface{} + var nodes []*v1.Node + for i := 0; i < count; i++ { + instanceID := makeInstanceID() + msgs = append(msgs, scheduledChangeMessage(instanceID)) + nodes = append(nodes, test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(instanceID), + })) + } + return msgs, nodes +} + +func makeStateChangeMessagesAndNodes(count int, states []string) ([]interface{}, []*v1.Node) { + var msgs []interface{} + var nodes []*v1.Node + for i := 0; i < count; i++ { + state := states[r.Intn(len(states))] + instanceID := makeInstanceID() + msgs = append(msgs, stateChangeMessage(instanceID, state)) + nodes = append(nodes, test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(instanceID), + })) + } + return msgs, nodes +} + +func makeSpotInterruptionMessagesAndNodes(count int) ([]interface{}, []*v1.Node) { + var msgs []interface{} + var nodes []*v1.Node + for i := 0; i < count; i++ { + instanceID := makeInstanceID() + msgs = append(msgs, spotInterruptionMessage(instanceID)) + nodes = append(nodes, test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(instanceID), + })) + } + return msgs, nodes +} diff --git a/pkg/controllers/interruption/messages/noop/model.go b/pkg/controllers/interruption/messages/noop/model.go new file mode 100644 index 000000000000..2afb6d86a93c --- /dev/null +++ b/pkg/controllers/interruption/messages/noop/model.go @@ -0,0 +1,31 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noop + +import ( + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +type Message struct { + messages.Metadata +} + +func (Message) EC2InstanceIDs() []string { + return []string{} +} + +func (Message) Kind() messages.Kind { + return messages.NoOpKind +} diff --git a/pkg/controllers/interruption/messages/rebalancerecommendation/model.go b/pkg/controllers/interruption/messages/rebalancerecommendation/model.go new file mode 100644 index 000000000000..b68e9d0cf4d0 --- /dev/null +++ b/pkg/controllers/interruption/messages/rebalancerecommendation/model.go @@ -0,0 +1,39 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rebalancerecommendation + +import ( + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +// Message contains the properties defined by +// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/rebalance-recommendations.html#monitor-rebalance-recommendations +type Message struct { + messages.Metadata + + Detail Detail `json:"detail"` +} + +type Detail struct { + InstanceID string `json:"instance-id"` +} + +func (m Message) EC2InstanceIDs() []string { + return []string{m.Detail.InstanceID} +} + +func (Message) Kind() messages.Kind { + return messages.RebalanceRecommendationKind +} diff --git a/pkg/controllers/interruption/messages/rebalancerecommendation/parser.go b/pkg/controllers/interruption/messages/rebalancerecommendation/parser.go new file mode 100644 index 000000000000..75fd8f4561a1 --- /dev/null +++ b/pkg/controllers/interruption/messages/rebalancerecommendation/parser.go @@ -0,0 +1,44 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rebalancerecommendation + +import ( + "encoding/json" + "fmt" + + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +type Parser struct{} + +func (p Parser) Parse(raw string) (messages.Message, error) { + msg := Message{} + if err := json.Unmarshal([]byte(raw), &msg); err != nil { + return nil, fmt.Errorf("unmarhsalling the message as EC2InstanceRebalanceRecommendation, %w", err) + } + return msg, nil +} + +func (p Parser) Version() string { + return "0" +} + +func (p Parser) Source() string { + return "aws.ec2" +} + +func (p Parser) DetailType() string { + return "EC2 Instance Rebalance Recommendation" +} diff --git a/pkg/controllers/interruption/messages/scheduledchange/model.go b/pkg/controllers/interruption/messages/scheduledchange/model.go new file mode 100644 index 000000000000..2b726bd3bf4f --- /dev/null +++ b/pkg/controllers/interruption/messages/scheduledchange/model.go @@ -0,0 +1,59 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledchange + +import ( + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +// Message contains the properties defined in AWS EventBridge schema +// aws.health@AWSHealthEvent v0. +type Message struct { + messages.Metadata + + Detail Detail `json:"detail"` +} + +func (m Message) EC2InstanceIDs() []string { + ids := make([]string, len(m.Detail.AffectedEntities)) + for i, entity := range m.Detail.AffectedEntities { + ids[i] = entity.EntityValue + } + return ids +} + +func (Message) Kind() messages.Kind { + return messages.ScheduledChangeKind +} + +type Detail struct { + EventARN string `json:"eventArn"` + EventTypeCode string `json:"eventTypeCode"` + Service string `json:"service"` + EventDescription []EventDescription `json:"eventDescription"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + EventTypeCategory string `json:"eventTypeCategory"` + AffectedEntities []AffectedEntity `json:"affectedEntities"` +} + +type EventDescription struct { + LatestDescription string `json:"latestDescription"` + Language string `json:"language"` +} + +type AffectedEntity struct { + EntityValue string `json:"entityValue"` +} diff --git a/pkg/controllers/interruption/messages/scheduledchange/parser.go b/pkg/controllers/interruption/messages/scheduledchange/parser.go new file mode 100644 index 000000000000..9d64451a4465 --- /dev/null +++ b/pkg/controllers/interruption/messages/scheduledchange/parser.go @@ -0,0 +1,55 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledchange + +import ( + "encoding/json" + "fmt" + + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +const ( + acceptedService = "EC2" + acceptedEventTypeCategory = "scheduledChange" +) + +type Parser struct{} + +func (p Parser) Parse(raw string) (messages.Message, error) { + msg := Message{} + if err := json.Unmarshal([]byte(raw), &msg); err != nil { + return nil, fmt.Errorf("unmarhsalling the message as AWSHealthEvent, %w", err) + } + + // We ignore services and event categories that we don't watch + if msg.Detail.Service != acceptedService || + msg.Detail.EventTypeCategory != acceptedEventTypeCategory { + return nil, nil + } + return msg, nil +} + +func (p Parser) Version() string { + return "0" +} + +func (p Parser) Source() string { + return "aws.health" +} + +func (p Parser) DetailType() string { + return "AWS Health Event" +} diff --git a/pkg/controllers/interruption/messages/spotinterruption/model.go b/pkg/controllers/interruption/messages/spotinterruption/model.go new file mode 100644 index 000000000000..d0af8572c0b4 --- /dev/null +++ b/pkg/controllers/interruption/messages/spotinterruption/model.go @@ -0,0 +1,40 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spotinterruption + +import ( + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +// Message contains the properties defined in AWS EventBridge schema +// aws.ec2@EC2SpotInstanceInterruptionWarning v0. +type Message struct { + messages.Metadata + + Detail Detail `json:"detail"` +} + +type Detail struct { + InstanceID string `json:"instance-id"` + InstanceAction string `json:"instance-action"` +} + +func (m Message) EC2InstanceIDs() []string { + return []string{m.Detail.InstanceID} +} + +func (Message) Kind() messages.Kind { + return messages.SpotInterruptionKind +} diff --git a/pkg/controllers/interruption/messages/spotinterruption/parser.go b/pkg/controllers/interruption/messages/spotinterruption/parser.go new file mode 100644 index 000000000000..cc5a8172ee7a --- /dev/null +++ b/pkg/controllers/interruption/messages/spotinterruption/parser.go @@ -0,0 +1,44 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spotinterruption + +import ( + "encoding/json" + "fmt" + + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +type Parser struct{} + +func (p Parser) Parse(raw string) (messages.Message, error) { + msg := Message{} + if err := json.Unmarshal([]byte(raw), &msg); err != nil { + return nil, fmt.Errorf("unmarhsalling the message as EC2SpotInstanceInterruptionWarning, %w", err) + } + return msg, nil +} + +func (p Parser) Version() string { + return "0" +} + +func (p Parser) Source() string { + return "aws.ec2" +} + +func (p Parser) DetailType() string { + return "EC2 Spot Instance Interruption Warning" +} diff --git a/pkg/controllers/interruption/messages/statechange/model.go b/pkg/controllers/interruption/messages/statechange/model.go new file mode 100644 index 000000000000..99bac01bdb64 --- /dev/null +++ b/pkg/controllers/interruption/messages/statechange/model.go @@ -0,0 +1,40 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statechange + +import ( + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +// Message contains the properties defined in AWS EventBridge schema +// aws.ec2@EC2InstanceStateChangeNotification v1. +type Message struct { + messages.Metadata + + Detail Detail `json:"detail"` +} + +type Detail struct { + InstanceID string `json:"instance-id"` + State string `json:"state"` +} + +func (m Message) EC2InstanceIDs() []string { + return []string{m.Detail.InstanceID} +} + +func (Message) Kind() messages.Kind { + return messages.StateChangeKind +} diff --git a/pkg/controllers/interruption/messages/statechange/parser.go b/pkg/controllers/interruption/messages/statechange/parser.go new file mode 100644 index 000000000000..b5b5ebc0b932 --- /dev/null +++ b/pkg/controllers/interruption/messages/statechange/parser.go @@ -0,0 +1,54 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statechange + +import ( + "encoding/json" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/aws/karpenter/pkg/controllers/interruption/messages" +) + +var acceptedStates = sets.NewString("stopping", "stopped", "shutting-down", "terminated") + +type Parser struct{} + +func (p Parser) Parse(raw string) (messages.Message, error) { + msg := Message{} + if err := json.Unmarshal([]byte(raw), &msg); err != nil { + return nil, fmt.Errorf("unmarhsalling the message as EC2InstanceStateChangeNotification, %w", err) + } + + // We ignore states that are not in the set of states we can react to + if !acceptedStates.Has(strings.ToLower(msg.Detail.State)) { + return nil, nil + } + return msg, nil +} + +func (p Parser) Version() string { + return "0" +} + +func (p Parser) Source() string { + return "aws.ec2" +} + +func (p Parser) DetailType() string { + return "EC2 Instance State-change Notification" +} diff --git a/pkg/controllers/interruption/messages/types.go b/pkg/controllers/interruption/messages/types.go new file mode 100644 index 000000000000..45f2f49def1e --- /dev/null +++ b/pkg/controllers/interruption/messages/types.go @@ -0,0 +1,58 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package messages + +import ( + "time" +) + +type Parser interface { + Parse(string) (Message, error) + + Version() string + Source() string + DetailType() string +} + +type Message interface { + EC2InstanceIDs() []string + Kind() Kind + StartTime() time.Time +} + +type Kind string + +const ( + RebalanceRecommendationKind Kind = "RebalanceRecommendationKind" + ScheduledChangeKind Kind = "ScheduledChangeKind" + SpotInterruptionKind Kind = "SpotInterruptionKind" + StateChangeKind Kind = "StateChangeKind" + NoOpKind Kind = "NoOpKind" +) + +type Metadata struct { + Account string `json:"account"` + DetailType string `json:"detail-type"` + ID string `json:"id"` + Region string `json:"region"` + Resources []string `json:"resources"` + Source string `json:"source"` + Time time.Time `json:"time"` + Version string `json:"version"` +} + +func (m Metadata) StartTime() time.Time { + return m.Time +} diff --git a/pkg/controllers/interruption/metrics.go b/pkg/controllers/interruption/metrics.go new file mode 100644 index 000000000000..d69d66dfe332 --- /dev/null +++ b/pkg/controllers/interruption/metrics.go @@ -0,0 +1,71 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package interruption + +import ( + "github.com/prometheus/client_golang/prometheus" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/aws/karpenter-core/pkg/metrics" +) + +const ( + subsystem = "aws_interruption_controller" + messageTypeLabel = "message_type" + actionTypeLabel = "action_type" + terminationReasonLabel = "interruption" +) + +var ( + receivedMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "received_messages", + Help: "Count of messages received from the SQS queue. Broken down by message type and whether the message was actionable.", + }, + []string{messageTypeLabel}, + ) + deletedMessages = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "deleted_messages", + Help: "Count of messages deleted from the SQS queue.", + }, + ) + messageLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "message_latency_time_seconds", + Help: "Length of time between message creation in queue and an action taken on the message by the controller.", + Buckets: metrics.DurationBuckets(), + }, + ) + actionsPerformed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "actions_performed", + Help: "Number of notification actions performed. Labeled by action", + }, + []string{actionTypeLabel}, + ) +) + +func init() { + crmetrics.Registry.MustRegister(receivedMessages, deletedMessages, messageLatency, actionsPerformed) +} diff --git a/pkg/controllers/interruption/parser.go b/pkg/controllers/interruption/parser.go new file mode 100644 index 000000000000..e1775c5d1370 --- /dev/null +++ b/pkg/controllers/interruption/parser.go @@ -0,0 +1,93 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package interruption + +import ( + "encoding/json" + "fmt" + + "github.com/samber/lo" + + "github.com/aws/karpenter/pkg/controllers/interruption/messages" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/noop" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/rebalancerecommendation" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/scheduledchange" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/spotinterruption" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/statechange" +) + +type parserKey struct { + Version string + Source string + DetailType string +} + +func newParserKey(metadata messages.Metadata) parserKey { + return parserKey{ + Version: metadata.Version, + Source: metadata.Source, + DetailType: metadata.DetailType, + } +} + +func newParserKeyFromParser(p messages.Parser) parserKey { + return parserKey{ + Version: p.Version(), + Source: p.Source(), + DetailType: p.DetailType(), + } +} + +var ( + DefaultParsers = []messages.Parser{ + statechange.Parser{}, + spotinterruption.Parser{}, + scheduledchange.Parser{}, + rebalancerecommendation.Parser{}, + } +) + +type EventParser struct { + parserMap map[parserKey]messages.Parser +} + +func NewEventParser(parsers ...messages.Parser) *EventParser { + return &EventParser{ + parserMap: lo.SliceToMap(parsers, func(p messages.Parser) (parserKey, messages.Parser) { + return newParserKeyFromParser(p), p + }), + } +} + +func (p EventParser) Parse(msg string) (messages.Message, error) { + if msg == "" { + return noop.Message{}, nil + } + md := messages.Metadata{} + if err := json.Unmarshal([]byte(msg), &md); err != nil { + return noop.Message{}, fmt.Errorf("unmarshalling the message as Metadata, %w", err) + } + if parser, ok := p.parserMap[newParserKey(md)]; ok { + evt, err := parser.Parse(msg) + if err != nil { + return noop.Message{}, fmt.Errorf("parsing event message, %w", err) + } + if evt == nil { + return noop.Message{}, nil + } + return evt, nil + } + return noop.Message{Metadata: md}, nil +} diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go new file mode 100644 index 000000000000..b7a0be3fe732 --- /dev/null +++ b/pkg/controllers/interruption/suite_test.go @@ -0,0 +1,389 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package interruption_test + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/Pallinder/go-randomdata" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/sqs" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + clock "k8s.io/utils/clock/testing" + . "knative.dev/pkg/logging/testing" + _ "knative.dev/pkg/system/testing" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/karpenter-core/pkg/apis/config/settings" + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider/fake" + "github.com/aws/karpenter-core/pkg/test" + . "github.com/aws/karpenter-core/pkg/test/expectations" + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + "github.com/aws/karpenter/pkg/apis/v1alpha1" + awscache "github.com/aws/karpenter/pkg/cache" + awscontext "github.com/aws/karpenter/pkg/context" + "github.com/aws/karpenter/pkg/controllers/interruption" + "github.com/aws/karpenter/pkg/controllers/interruption/messages" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/scheduledchange" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/spotinterruption" + "github.com/aws/karpenter/pkg/controllers/interruption/messages/statechange" + "github.com/aws/karpenter/pkg/controllers/providers" + "github.com/aws/karpenter/pkg/errors" + awsfake "github.com/aws/karpenter/pkg/fake" + awstest "github.com/aws/karpenter/pkg/test" +) + +const ( + defaultAccountID = "000000000000" + defaultInstanceID = "i-08c6fdb11e28c8c90" + defaultRegion = "us-west-2" + ec2Source = "aws.ec2" + healthSource = "aws.health" +) + +var ctx context.Context +var env *test.Environment +var nodeTemplate *v1alpha1.AWSNodeTemplate +var sqsapi *awsfake.SQSAPI +var eventbridgeapi *awsfake.EventBridgeAPI +var cloudProvider *fake.CloudProvider +var sqsProvider *providers.SQS +var eventBridgeProvider *providers.EventBridge +var unavailableOfferingsCache *awscache.UnavailableOfferings +var recorder *test.EventRecorder +var fakeClock *clock.FakeClock +var controller *interruption.Controller + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "AWSInterruption") +} + +var _ = BeforeSuite(func() { + env = test.NewEnvironment(ctx, func(e *test.Environment) { + fakeClock = &clock.FakeClock{} + cloudProvider = &fake.CloudProvider{} + + nodeTemplate = awstest.AWSNodeTemplate() + ExpectApplied(ctx, e.Client, nodeTemplate) + + recorder = test.NewEventRecorder() + unavailableOfferingsCache = awscache.NewUnavailableOfferings(cache.New(awscache.UnavailableOfferingsTTL, awscontext.CacheCleanupInterval)) + + sqsapi = &awsfake.SQSAPI{} + sqsProvider = providers.NewSQS(sqsapi) + eventbridgeapi = &awsfake.EventBridgeAPI{} + eventBridgeProvider = providers.NewEventBridge(eventbridgeapi, sqsProvider) + }) + env.CRDDirectoryPaths = append(env.CRDDirectoryPaths, relativeToRoot("charts/karpenter/crds")) + Expect(env.Start()).To(Succeed(), "Failed to start environment") +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + controller = interruption.NewController(env.Client, fakeClock, recorder, sqsProvider, unavailableOfferingsCache) + settingsStore := test.SettingsStore{ + settings.ContextKey: test.Settings(), + awssettings.ContextKey: awssettings.Settings{ + EnableInterruptionHandling: true, + }, + } + ctx = settingsStore.InjectSettings(ctx) +}) + +var _ = AfterEach(func() { + sqsapi.Reset() + eventbridgeapi.Reset() + ExpectCleanedUp(ctx, env.Client) + ExpectDeleted(ctx, env.Client, nodeTemplate) +}) + +var _ = Describe("AWSInterruption", func() { + Context("Processing Messages", func() { + It("should delete the node when receiving a spot interruption warning", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(defaultInstanceID), + }) + ExpectMessagesCreated(spotInterruptionMessage(defaultInstanceID)) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNotFound(ctx, env.Client, node) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + }) + It("should delete the node when receiving a scheduled change message", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(defaultInstanceID), + }) + ExpectMessagesCreated(scheduledChangeMessage(defaultInstanceID)) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNotFound(ctx, env.Client, node) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + }) + It("should delete the node when receiving a state change message", func() { + var nodes []*v1.Node + var messages []interface{} + for _, state := range []string{"terminated", "stopped", "stopping", "shutting-down"} { + instanceID := makeInstanceID() + nodes = append(nodes, test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(instanceID), + })) + messages = append(messages, stateChangeMessage(instanceID, state)) + } + ExpectMessagesCreated(messages...) + ExpectApplied(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNotFound(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(4)) + }) + It("should handle multiple messages that cause node deletion", func() { + var nodes []*v1.Node + var instanceIDs []string + for i := 0; i < 100; i++ { + instanceIDs = append(instanceIDs, makeInstanceID()) + nodes = append(nodes, test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(instanceIDs[len(instanceIDs)-1]), + })) + + } + + var messages []interface{} + for _, id := range instanceIDs { + messages = append(messages, spotInterruptionMessage(id)) + } + ExpectMessagesCreated(messages...) + ExpectApplied(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNotFound(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(100)) + }) + It("should not delete a node when not owned by provisioner", func() { + node := test.Node(test.NodeOptions{ + ProviderID: makeProviderID(string(uuid.NewUUID())), + }) + ExpectMessagesCreated(spotInterruptionMessage(node.Spec.ProviderID)) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNodeExists(ctx, env.Client, node.Name) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + }) + It("should delete a message when the message can't be parsed", func() { + badMessage := &sqs.Message{ + Body: aws.String(string(lo.Must(json.Marshal(map[string]string{ + "field1": "value1", + "field2": "value2", + })))), + MessageId: aws.String(string(uuid.NewUUID())), + } + + ExpectMessagesCreated(badMessage) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + }) + It("should delete a state change message when the state isn't in accepted states", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + }, + }, + ProviderID: makeProviderID(defaultInstanceID), + }) + ExpectMessagesCreated(stateChangeMessage(defaultInstanceID, "creating")) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNodeExists(ctx, env.Client, node.Name) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + }) + It("should mark the ICE cache for the offering when getting a spot interruption warning", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: "default", + v1.LabelTopologyZone: "test-zone-1a", + v1.LabelInstanceTypeStable: "t3.large", + v1alpha5.LabelCapacityType: v1alpha1.CapacityTypeSpot, + }, + }, + ProviderID: makeProviderID(defaultInstanceID), + }) + ExpectMessagesCreated(spotInterruptionMessage(defaultInstanceID)) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectNotFound(ctx, env.Client, node) + Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) + + // Expect a t3.large in test-zone-1a to be added to the ICE cache + Expect(unavailableOfferingsCache.IsUnavailable("t3.large", "test-zone-1a", v1alpha1.CapacityTypeSpot)).To(BeTrue()) + }) + }) + Context("Error Handling", func() { + It("should send an error on polling when AccessDenied", func() { + sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode(errors.AccessDeniedCode), awsfake.MaxCalls(0)) + ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + }) + It("should send an error on polling when QueueDeletedRecently", func() { + sqsapi.GetQueueURLBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDeletedRecently), awsfake.MaxCalls(0)) + ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + }) + }) +}) + +func ExpectMessagesCreated(messages ...interface{}) { + raw := lo.Map(messages, func(m interface{}, _ int) *sqs.Message { + return &sqs.Message{ + Body: aws.String(string(lo.Must(json.Marshal(m)))), + MessageId: aws.String(string(uuid.NewUUID())), + } + }) + sqsapi.ReceiveMessageBehavior.Output.Set( + &sqs.ReceiveMessageOutput{ + Messages: raw, + }, + ) +} + +func awsErrWithCode(code string) awserr.Error { + return awserr.New(code, "", fmt.Errorf("")) +} + +func spotInterruptionMessage(involvedInstanceID string) spotinterruption.Message { + return spotinterruption.Message{ + Metadata: messages.Metadata{ + Version: "0", + Account: defaultAccountID, + DetailType: "EC2 Spot Instance Interruption Warning", + ID: string(uuid.NewUUID()), + Region: defaultRegion, + Resources: []string{ + fmt.Sprintf("arn:aws:ec2:%s:instance/%s", defaultRegion, involvedInstanceID), + }, + Source: ec2Source, + Time: time.Now(), + }, + Detail: spotinterruption.Detail{ + InstanceID: involvedInstanceID, + InstanceAction: "terminate", + }, + } +} + +func stateChangeMessage(involvedInstanceID, state string) statechange.Message { + return statechange.Message{ + Metadata: messages.Metadata{ + Version: "0", + Account: defaultAccountID, + DetailType: "EC2 Instance State-change Notification", + ID: string(uuid.NewUUID()), + Region: defaultRegion, + Resources: []string{ + fmt.Sprintf("arn:aws:ec2:%s:instance/%s", defaultRegion, involvedInstanceID), + }, + Source: ec2Source, + Time: time.Now(), + }, + Detail: statechange.Detail{ + InstanceID: involvedInstanceID, + State: state, + }, + } +} + +func scheduledChangeMessage(involvedInstanceID string) scheduledchange.Message { + return scheduledchange.Message{ + Metadata: messages.Metadata{ + Version: "0", + Account: defaultAccountID, + DetailType: "AWS Health Event", + ID: string(uuid.NewUUID()), + Region: defaultRegion, + Resources: []string{ + fmt.Sprintf("arn:aws:ec2:%s:instance/%s", defaultRegion, involvedInstanceID), + }, + Source: healthSource, + Time: time.Now(), + }, + Detail: scheduledchange.Detail{ + Service: "EC2", + EventTypeCategory: "scheduledChange", + AffectedEntities: []scheduledchange.AffectedEntity{ + { + EntityValue: involvedInstanceID, + }, + }, + }, + } +} + +func makeProviderID(instanceID string) string { + return fmt.Sprintf("aws:///%s/%s", defaultRegion, instanceID) +} + +func makeInstanceID() string { + return fmt.Sprintf("i-%s", randomdata.Alphanumeric(17)) +} + +func relativeToRoot(path string) string { + _, file, _, _ := runtime.Caller(0) + manifestsRoot := filepath.Join(filepath.Dir(file), "..", "..", "..") + return filepath.Join(manifestsRoot, path) +} diff --git a/pkg/controllers/nodetemplate/controller.go b/pkg/controllers/nodetemplate/controller.go new file mode 100644 index 000000000000..533ba093f4e7 --- /dev/null +++ b/pkg/controllers/nodetemplate/controller.go @@ -0,0 +1,108 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodetemplate + +import ( + "context" + "fmt" + "net/http" + + "github.com/samber/lo" + "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "knative.dev/pkg/logging" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" + "github.com/aws/karpenter-core/pkg/operator/scheme" + "github.com/aws/karpenter-core/pkg/utils/result" + "github.com/aws/karpenter/pkg/apis" + "github.com/aws/karpenter/pkg/apis/v1alpha1" + "github.com/aws/karpenter/pkg/controllers/providers" +) + +const Name = "nodetemplate" + +func init() { + lo.Must0(apis.AddToScheme(scheme.Scheme)) +} + +// Controller is the AWSNodeTemplate Controller +// It sub-reconciles by checking if there are any AWSNodeTemplates and provisions infrastructure +// if there is. If there are no templates, then it de-provisions the infrastructure. +type Controller struct { + kubeClient client.Client + finalizer *FinalizerReconciler + infrastructure *InfrastructureReconciler +} + +func NewController(kubeClient client.Client, sqsProvider *providers.SQS, eventBridgeProvider *providers.EventBridge) *Controller { + return &Controller{ + kubeClient: kubeClient, + finalizer: NewFinalizerReconciler(), + infrastructure: NewInfrastructureReconciler(kubeClient, sqsProvider, eventBridgeProvider), + } +} + +// Reconcile reconciles the AWSNodeTemplate with its sub-reconcilers +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(Name)) + stored := &v1alpha1.AWSNodeTemplate{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, stored); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + nodeTemplate := stored.DeepCopy() + var results []reconcile.Result + var errs error + for _, r := range []interface { + Reconcile(context.Context, *v1alpha1.AWSNodeTemplate) (reconcile.Result, error) + }{ + c.infrastructure, + c.finalizer, + } { + res, err := r.Reconcile(ctx, nodeTemplate) + errs = multierr.Append(errs, err) + results = append(results, res) + } + // If there are any errors, we shouldn't apply the changes, we should requeue + if errs != nil { + return reconcile.Result{}, errs + } + if !equality.Semantic.DeepEqual(nodeTemplate, stored) { + if err := c.kubeClient.Patch(ctx, nodeTemplate, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, fmt.Errorf("patching AWSNodeTemplate, %w", err) + } + } + return result.Min(results...), nil +} + +func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { + return controllerruntime. + NewControllerManagedBy(m). + Named(Name). + For(&v1alpha1.AWSNodeTemplate{}) +} + +func (c *Controller) LivenessProbe(_ *http.Request) error { + return nil +} diff --git a/pkg/controllers/nodetemplate/finalizer.go b/pkg/controllers/nodetemplate/finalizer.go new file mode 100644 index 000000000000..29fec3cbcae0 --- /dev/null +++ b/pkg/controllers/nodetemplate/finalizer.go @@ -0,0 +1,41 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodetemplate + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter/pkg/apis/v1alpha1" +) + +type FinalizerReconciler struct{} + +func NewFinalizerReconciler() *FinalizerReconciler { + return &FinalizerReconciler{} +} + +// Reconcile adds the finalizer if the nodeTemplate doesn't have it or removes the finalizer +// if the nodeTemplate is being deleted +func (r *FinalizerReconciler) Reconcile(_ context.Context, nodeTemplate *v1alpha1.AWSNodeTemplate) (reconcile.Result, error) { + if !nodeTemplate.DeletionTimestamp.IsZero() { + controllerutil.RemoveFinalizer(nodeTemplate, v1alpha1.InterruptionInfrastructureFinalizer) + return reconcile.Result{}, nil + } + controllerutil.AddFinalizer(nodeTemplate, v1alpha1.InterruptionInfrastructureFinalizer) + return reconcile.Result{}, nil +} diff --git a/pkg/controllers/nodetemplate/infrastructure.go b/pkg/controllers/nodetemplate/infrastructure.go new file mode 100644 index 000000000000..a5db9b5cb9de --- /dev/null +++ b/pkg/controllers/nodetemplate/infrastructure.go @@ -0,0 +1,160 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodetemplate + +import ( + "context" + "fmt" + "time" + + "go.uber.org/multierr" + "k8s.io/client-go/util/workqueue" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/metrics" + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + "github.com/aws/karpenter/pkg/apis/v1alpha1" + "github.com/aws/karpenter/pkg/controllers/providers" +) + +type InfrastructureReconciler struct { + kubeClient client.Client + sqsProvider *providers.SQS + eventBridgeProvider *providers.EventBridge + + lastInfrastructureReconcile time.Time // Keeps track of the last reconcile time for infra, so we don't keep calling APIs +} + +func NewInfrastructureReconciler(kubeClient client.Client, sqsProvider *providers.SQS, eventBridgeProvider *providers.EventBridge) *InfrastructureReconciler { + return &InfrastructureReconciler{ + kubeClient: kubeClient, + sqsProvider: sqsProvider, + eventBridgeProvider: eventBridgeProvider, + } +} + +// Reconcile reconciles the infrastructure based on whether interruption handling is enabled and deletes +// the infrastructure by ref-counting when the last AWSNodeTemplate is removed +func (i *InfrastructureReconciler) Reconcile(ctx context.Context, nodeTemplate *v1alpha1.AWSNodeTemplate) (reconcile.Result, error) { + if !awssettings.FromContext(ctx).EnableInterruptionHandling { + // TODO: Implement an alerting mechanism for settings updates; until then, just poll + return reconcile.Result{RequeueAfter: time.Second * 10}, nil + } + list := &v1alpha1.AWSNodeTemplateList{} + if err := i.kubeClient.List(ctx, list); err != nil { + return reconcile.Result{}, err + } + if !nodeTemplate.DeletionTimestamp.IsZero() && len(list.Items) == 1 { + if err := i.DeleteInfrastructure(ctx); err != nil { + return reconcile.Result{}, err + } + i.lastInfrastructureReconcile = time.Time{} + return reconcile.Result{}, nil + } else if len(list.Items) >= 1 { + if i.lastInfrastructureReconcile.Add(time.Minute * 5).Before(time.Now()) { + if err := i.CreateInfrastructure(ctx); err != nil { + return reconcile.Result{}, err + } + i.lastInfrastructureReconcile = time.Now() + } + } + // TODO: Implement an alerting mechanism for settings updates; until then, just poll + return reconcile.Result{RequeueAfter: time.Second * 10}, nil +} + +// CreateInfrastructure provisions an SQS queue and EventBridge rules to enable interruption handling +func (i *InfrastructureReconciler) CreateInfrastructure(ctx context.Context) error { + defer metrics.Measure(infrastructureCreateDuration)() + if err := i.ensureQueue(ctx); err != nil { + return fmt.Errorf("ensuring queue, %w", err) + } + if err := i.ensureEventBridge(ctx); err != nil { + return fmt.Errorf("ensuring eventBridge rules and targets, %w", err) + } + logging.FromContext(ctx).Infof("Ensured existence of interruption-handling infrastructure") + return nil +} + +// DeleteInfrastructure removes the infrastructure that was stood up and reconciled +// by the infrastructure controller for SQS message polling +func (i *InfrastructureReconciler) DeleteInfrastructure(ctx context.Context) error { + defer metrics.Measure(infrastructureDeleteDuration)() + funcs := []func(context.Context) error{ + i.deleteQueue, + i.deleteEventBridge, + } + errs := make([]error, len(funcs)) + workqueue.ParallelizeUntil(ctx, len(funcs), len(funcs), func(i int) { + errs[i] = funcs[i](ctx) + }) + + err := multierr.Combine(errs...) + if err != nil { + return err + } + logging.FromContext(ctx).Infof("Deprovisioned the interruption-handling infrastructure") + return nil +} + +// ensureQueue reconciles the SQS queue with the configuration prescribed by Karpenter +func (i *InfrastructureReconciler) ensureQueue(ctx context.Context) error { + // Attempt to find the queue. If we can't find it, assume it isn't created and try to create it + // If we did find it, then just set the queue attributes on the existing queue + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("queueName", i.sqsProvider.QueueName(ctx))) + queueExists, err := i.sqsProvider.QueueExists(ctx) + if err != nil { + return fmt.Errorf("checking the SQS interruption queue existence, %w", err) + } + if !queueExists { + logging.FromContext(ctx).Debugf("Queue not found, creating the SQS interruption queue") + if err := i.sqsProvider.CreateQueue(ctx); err != nil { + return fmt.Errorf("creating the SQSS interruption queue with policy, %w", err) + } + } + // Always attempt to set the queue attributes, even after creation to help set the queue policy + if err := i.sqsProvider.SetQueueAttributes(ctx, nil); err != nil { + return fmt.Errorf("setting queue attributes for interruption queue, %w", err) + } + logging.FromContext(ctx).Debugf("Reconciled the SQS interruption queue") + return nil +} + +func (i *InfrastructureReconciler) deleteQueue(ctx context.Context) error { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("queueName", i.sqsProvider.QueueName(ctx))) + if err := i.sqsProvider.DeleteQueue(ctx); err != nil { + return fmt.Errorf("deleting the the SQS interruption queue, %w", err) + } + logging.FromContext(ctx).Debugf("Deleted the SQS interruption queue") + return nil +} + +// ensureEventBridge reconciles the EventBridge rules with the configuration prescribed by Karpenter +func (i *InfrastructureReconciler) ensureEventBridge(ctx context.Context) error { + if err := i.eventBridgeProvider.CreateRules(ctx); err != nil { + return fmt.Errorf("creating EventBridge interruption rules, %w", err) + } + logging.FromContext(ctx).Debugf("Reconciled the EventBridge interruption rules") + return nil +} + +func (i *InfrastructureReconciler) deleteEventBridge(ctx context.Context) error { + if err := i.eventBridgeProvider.DeleteRules(ctx); err != nil { + return fmt.Errorf("deleting the EventBridge interruption rules, %w", err) + } + logging.FromContext(ctx).Debugf("Deleted the EventBridge interruption rules") + return nil +} diff --git a/pkg/controllers/nodetemplate/metrics.go b/pkg/controllers/nodetemplate/metrics.go new file mode 100644 index 000000000000..d59bb1589d5d --- /dev/null +++ b/pkg/controllers/nodetemplate/metrics.go @@ -0,0 +1,49 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodetemplate + +import ( + "github.com/prometheus/client_golang/prometheus" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/aws/karpenter-core/pkg/metrics" +) + +const subSystem = "nodetemplate_infrastructure" + +var ( + infrastructureCreateDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: subSystem, + Name: "create_time_seconds", + Help: "Length of time to create infrastructure.", + Buckets: metrics.DurationBuckets(), + }, + ) + infrastructureDeleteDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: subSystem, + Name: "delete_time_seconds", + Help: "Length of time to delete infrastructure.", + Buckets: metrics.DurationBuckets(), + }, + ) +) + +func init() { + crmetrics.Registry.MustRegister(infrastructureCreateDuration, infrastructureDeleteDuration) +} diff --git a/pkg/controllers/nodetemplate/suite_test.go b/pkg/controllers/nodetemplate/suite_test.go new file mode 100644 index 000000000000..82c53d8f8fd4 --- /dev/null +++ b/pkg/controllers/nodetemplate/suite_test.go @@ -0,0 +1,374 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodetemplate_test + +import ( + "context" + "fmt" + "path/filepath" + "runtime" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/sqs" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "knative.dev/pkg/logging/testing" + _ "knative.dev/pkg/system/testing" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/karpenter-core/pkg/apis/config/settings" + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/operator/injection" + "github.com/aws/karpenter-core/pkg/operator/options" + "github.com/aws/karpenter-core/pkg/test" + . "github.com/aws/karpenter-core/pkg/test/expectations" + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + "github.com/aws/karpenter/pkg/apis/v1alpha1" + "github.com/aws/karpenter/pkg/controllers/nodetemplate" + "github.com/aws/karpenter/pkg/controllers/providers" + "github.com/aws/karpenter/pkg/errors" + awsfake "github.com/aws/karpenter/pkg/fake" + awstest "github.com/aws/karpenter/pkg/test" +) + +var ctx context.Context +var env *test.Environment +var sqsapi *awsfake.SQSAPI +var sqsProvider *providers.SQS +var eventbridgeapi *awsfake.EventBridgeAPI +var eventBridgeProvider *providers.EventBridge +var controller *nodetemplate.Controller +var opts options.Options + +var defaultOpts = options.Options{ + ClusterName: "test-cluster", + ClusterEndpoint: "https://test-cluster", + AWSNodeNameConvention: string(options.IPName), + AWSENILimitedPodDensity: true, + AWSEnablePodENI: true, + AWSDefaultInstanceProfile: "test-instance-profile", +} + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "AWSNodeTemplate") +} + +var _ = BeforeSuite(func() { + env = test.NewEnvironment(ctx, func(e *test.Environment) { + opts = defaultOpts + Expect(opts.Validate()).To(Succeed(), "Failed to validate options") + e.Ctx = injection.WithOptions(e.Ctx, opts) + + sqsapi = &awsfake.SQSAPI{} + eventbridgeapi = &awsfake.EventBridgeAPI{} + sqsProvider = providers.NewSQS(sqsapi) + eventBridgeProvider = providers.NewEventBridge(eventbridgeapi, sqsProvider) + }) + env.CRDDirectoryPaths = append(env.CRDDirectoryPaths, relativeToRoot("charts/karpenter/crds")) + Expect(env.Start()).To(Succeed(), "Failed to start environment") +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + controller = nodetemplate.NewController(env.Client, sqsProvider, eventBridgeProvider) + settingsStore := test.SettingsStore{ + settings.ContextKey: test.Settings(), + awssettings.ContextKey: awssettings.Settings{ + EnableInterruptionHandling: true, + }, + } + ctx = settingsStore.InjectSettings(ctx) + ctx = injection.WithOptions(ctx, defaultOpts) +}) + +var _ = AfterEach(func() { + sqsapi.Reset() + eventbridgeapi.Reset() + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = Describe("AWSNodeTemplate", func() { + Context("Infrastructure", func() { + Context("Creation", func() { + var provider *v1alpha1.AWSNodeTemplate + BeforeEach(func() { + provider = awstest.AWSNodeTemplate() + ExpectApplied(ctx, env.Client, provider) + }) + AfterEach(func() { + ExpectFinalizersRemoved(ctx, env.Client, provider) + ExpectDeleted(ctx, env.Client, provider) + }) + It("should reconcile the queue and the eventbridge rules on start", func() { + sqsapi.GetQueueURLBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDoesNotExist), awsfake.MaxCalls(1)) // This mocks the queue not existing + + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.CreateQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.PutRuleBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.PutTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + }) + It("should throw an error but wait with backoff if we get AccessDenied", func() { + sqsapi.GetQueueURLBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDoesNotExist), awsfake.MaxCalls(0)) // This mocks the queue not existing + sqsapi.CreateQueueBehavior.Error.Set(awsErrWithCode(errors.AccessDeniedCode), awsfake.MaxCalls(0)) + eventbridgeapi.PutRuleBehavior.Error.Set(awsErrWithCode(errors.AccessDeniedExceptionCode), awsfake.MaxCalls(0)) + eventbridgeapi.PutTargetsBehavior.Error.Set(awsErrWithCode(errors.AccessDeniedExceptionCode), awsfake.MaxCalls(0)) + + ExpectReconcileFailed(ctx, controller, client.ObjectKeyFromObject(provider)) + Expect(sqsapi.CreateQueueBehavior.FailedCalls()).To(Equal(1)) + + // Simulating AccessDenied being resolved + sqsapi.CreateQueueBehavior.Reset() + eventbridgeapi.PutRuleBehavior.Reset() + eventbridgeapi.PutTargetsBehavior.Reset() + + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + Expect(sqsapi.CreateQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.PutRuleBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.PutTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + }) + It("should throw an error and wait with backoff if we get QueueDeletedRecently", func() { + sqsapi.GetQueueURLBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDoesNotExist), awsfake.MaxCalls(0)) // This mocks the queue not existing + sqsapi.CreateQueueBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDeletedRecently), awsfake.MaxCalls(0)) + + ExpectReconcileFailed(ctx, controller, client.ObjectKeyFromObject(provider)) + Expect(sqsapi.CreateQueueBehavior.FailedCalls()).To(Equal(1)) + }) + }) + Context("Deletion", func() { + It("should cleanup the infrastructure when the last AWSNodeTemplate is removed", func() { + provider := awstest.AWSNodeTemplate() + sqsapi.GetQueueURLBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDoesNotExist), awsfake.MaxCalls(1)) // This mocks the queue not existing + + ExpectApplied(ctx, env.Client, provider) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.CreateQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.PutRuleBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.PutTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + + // Set the output of ListRules to mock rule creation + eventbridgeapi.ListRulesBehavior.Output.Set(&eventbridge.ListRulesOutput{ + Rules: []*eventbridge.Rule{ + { + Name: aws.String(providers.DefaultRules[providers.ScheduledChangedRule].Name), + Arn: aws.String("test-arn1"), + }, + { + Name: aws.String(providers.DefaultRules[providers.SpotTerminationRule].Name), + Arn: aws.String("test-arn2"), + }, + { + Name: aws.String(providers.DefaultRules[providers.RebalanceRule].Name), + Arn: aws.String("test-arn3"), + }, + { + Name: aws.String(providers.DefaultRules[providers.StateChangeRule].Name), + Arn: aws.String("test-arn4"), + }, + }, + }) + eventbridgeapi.ListTagsForResourceBehavior.Output.Set(&eventbridge.ListTagsForResourceOutput{ + Tags: []*eventbridge.Tag{ + { + Key: aws.String(v1alpha5.DiscoveryTagKey), + Value: aws.String(defaultOpts.ClusterName), + }, + }, + }) + + // Delete the AWSNodeTemplate and then re-reconcile it to delete the infrastructure + Expect(env.Client.Delete(ctx, provider)).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.DeleteQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.DeleteRuleBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.RemoveTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + }) + It("should cleanup when queue is already deleted", func() { + provider := awstest.AWSNodeTemplate() + ExpectApplied(ctx, env.Client, provider) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + sqsapi.DeleteQueueBehavior.Error.Set(awsErrWithCode(sqs.ErrCodeQueueDoesNotExist), awsfake.MaxCalls(0)) + + // Set the output of ListRules to mock rule creation + eventbridgeapi.ListRulesBehavior.Output.Set(&eventbridge.ListRulesOutput{ + Rules: []*eventbridge.Rule{ + { + Name: aws.String(providers.DefaultRules[providers.ScheduledChangedRule].Name), + Arn: aws.String("test-arn1"), + }, + { + Name: aws.String(providers.DefaultRules[providers.SpotTerminationRule].Name), + Arn: aws.String("test-arn2"), + }, + { + Name: aws.String(providers.DefaultRules[providers.RebalanceRule].Name), + Arn: aws.String("test-arn3"), + }, + { + Name: aws.String(providers.DefaultRules[providers.StateChangeRule].Name), + Arn: aws.String("test-arn4"), + }, + }, + }) + eventbridgeapi.ListTagsForResourceBehavior.Output.Set(&eventbridge.ListTagsForResourceOutput{ + Tags: []*eventbridge.Tag{ + { + Key: aws.String(v1alpha5.DiscoveryTagKey), + Value: aws.String(defaultOpts.ClusterName), + }, + }, + }) + + // Delete the AWSNodeTemplate and then re-reconcile it to delete the infrastructure + Expect(env.Client.Delete(ctx, provider)).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.DeleteQueueBehavior.SuccessfulCalls()).To(Equal(0)) + Expect(eventbridgeapi.DeleteRuleBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.RemoveTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + }) + It("should cleanup with a success when a few rules aren't in list call", func() { + provider := awstest.AWSNodeTemplate() + ExpectApplied(ctx, env.Client, provider) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + // Set the output of ListRules to mock rule creation + eventbridgeapi.ListRulesBehavior.Output.Set(&eventbridge.ListRulesOutput{ + Rules: []*eventbridge.Rule{ + { + Name: aws.String(providers.DefaultRules[providers.ScheduledChangedRule].Name), + Arn: aws.String("test-arn1"), + }, + { + Name: aws.String(providers.DefaultRules[providers.SpotTerminationRule].Name), + Arn: aws.String("test-arn2"), + }, + }, + }) + eventbridgeapi.ListTagsForResourceBehavior.Output.Set(&eventbridge.ListTagsForResourceOutput{ + Tags: []*eventbridge.Tag{ + { + Key: aws.String(v1alpha5.DiscoveryTagKey), + Value: aws.String(defaultOpts.ClusterName), + }, + }, + }) + + // Delete the AWSNodeTemplate and then re-reconcile it to delete the infrastructure + Expect(env.Client.Delete(ctx, provider)).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.DeleteQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.RemoveTargetsBehavior.SuccessfulCalls()).To(Equal(2)) + Expect(eventbridgeapi.DeleteRuleBehavior.SuccessfulCalls()).To(Equal(2)) + }) + It("should cleanup with a success when getting not found errors", func() { + provider := awstest.AWSNodeTemplate() + ExpectApplied(ctx, env.Client, provider) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + eventbridgeapi.RemoveTargetsBehavior.Error.Set(awsErrWithCode((&eventbridge.ResourceNotFoundException{}).Code()), awsfake.MaxCalls(0)) + eventbridgeapi.DeleteRuleBehavior.Error.Set(awsErrWithCode((&eventbridge.ResourceNotFoundException{}).Code()), awsfake.MaxCalls(0)) + + // Delete the AWSNodeTemplate and then re-reconcile it to delete the infrastructure + Expect(env.Client.Delete(ctx, provider)).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provider)) + + Expect(sqsapi.DeleteQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.RemoveTargetsBehavior.SuccessfulCalls()).To(Equal(0)) + Expect(eventbridgeapi.DeleteRuleBehavior.SuccessfulCalls()).To(Equal(0)) + }) + It("should only attempt to delete the infrastructure when the last node template is removed", func() { + var nodeTemplates []*v1alpha1.AWSNodeTemplate + for i := 0; i < 10; i++ { + p := awstest.AWSNodeTemplate() + nodeTemplates = append(nodeTemplates, p) + ExpectApplied(ctx, env.Client, p) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(p)) + } + + for i := 0; i < len(nodeTemplates)-1; i++ { + Expect(env.Client.Delete(ctx, nodeTemplates[i])).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeTemplates[i])) + } + + // It shouldn't attempt to delete at this point + Expect(sqsapi.DeleteQueueBehavior.Calls()).To(Equal(0)) + Expect(eventbridgeapi.RemoveTargetsBehavior.Calls()).To(Equal(0)) + Expect(eventbridgeapi.DeleteRuleBehavior.Calls()).To(Equal(0)) + + // Set the output of ListRules to mock rule creation + eventbridgeapi.ListRulesBehavior.Output.Set(&eventbridge.ListRulesOutput{ + Rules: []*eventbridge.Rule{ + { + Name: aws.String(providers.DefaultRules[providers.ScheduledChangedRule].Name), + Arn: aws.String("test-arn1"), + }, + { + Name: aws.String(providers.DefaultRules[providers.SpotTerminationRule].Name), + Arn: aws.String("test-arn2"), + }, + { + Name: aws.String(providers.DefaultRules[providers.RebalanceRule].Name), + Arn: aws.String("test-arn3"), + }, + { + Name: aws.String(providers.DefaultRules[providers.StateChangeRule].Name), + Arn: aws.String("test-arn4"), + }, + }, + }) + eventbridgeapi.ListTagsForResourceBehavior.Output.Set(&eventbridge.ListTagsForResourceOutput{ + Tags: []*eventbridge.Tag{ + { + Key: aws.String(v1alpha5.DiscoveryTagKey), + Value: aws.String(defaultOpts.ClusterName), + }, + }, + }) + + // Last AWSNodeTemplate, so now it should delete it + Expect(env.Client.Delete(ctx, nodeTemplates[len(nodeTemplates)-1])).To(Succeed()) + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeTemplates[len(nodeTemplates)-1])) + + Expect(sqsapi.DeleteQueueBehavior.SuccessfulCalls()).To(Equal(1)) + Expect(eventbridgeapi.RemoveTargetsBehavior.SuccessfulCalls()).To(Equal(4)) + Expect(eventbridgeapi.DeleteRuleBehavior.SuccessfulCalls()).To(Equal(4)) + }) + }) + }) +}) + +func awsErrWithCode(code string) awserr.Error { + return awserr.New(code, "", fmt.Errorf("")) +} + +func relativeToRoot(path string) string { + _, file, _, _ := runtime.Caller(0) + manifestsRoot := filepath.Join(filepath.Dir(file), "..", "..", "..") + return filepath.Join(manifestsRoot, path) +} diff --git a/pkg/controllers/providers/eventbridge.go b/pkg/controllers/providers/eventbridge.go new file mode 100644 index 000000000000..5747b4f3cc0d --- /dev/null +++ b/pkg/controllers/providers/eventbridge.go @@ -0,0 +1,264 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/eventbridge/eventbridgeiface" + "github.com/samber/lo" + "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/util/workqueue" + + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/operator/injection" + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + awserrors "github.com/aws/karpenter/pkg/errors" +) + +const ( + ScheduledChangedRule = "ScheduledChangeRule" + SpotTerminationRule = "SpotTerminationRule" + RebalanceRule = "RebalanceRule" + StateChangeRule = "StateChangeRule" +) + +var DefaultRules = map[string]Rule{ + ScheduledChangedRule: { + Name: fmt.Sprintf("Karpenter-%s-%s", ScheduledChangedRule, rand.String(64-len(fmt.Sprintf("Karpenter-%s-", ScheduledChangedRule)))), + Pattern: Pattern{ + Source: []string{"aws.health"}, + DetailType: []string{"AWS Health Event"}, + }, + }, + SpotTerminationRule: { + Name: fmt.Sprintf("Karpenter-%s-%s", SpotTerminationRule, rand.String(64-len(fmt.Sprintf("Karpenter-%s-", SpotTerminationRule)))), + Pattern: Pattern{ + Source: []string{"aws.ec2"}, + DetailType: []string{"EC2 Spot Instance Interruption Warning"}, + }, + }, + RebalanceRule: { + Name: fmt.Sprintf("Karpenter-%s-%s", RebalanceRule, rand.String(64-len(fmt.Sprintf("Karpenter-%s-", RebalanceRule)))), + Pattern: Pattern{ + Source: []string{"aws.ec2"}, + DetailType: []string{"EC2 Instance Rebalance Recommendation"}, + }, + }, + StateChangeRule: { + Name: fmt.Sprintf("Karpenter-%s-%s", StateChangeRule, rand.String(64-len(fmt.Sprintf("Karpenter-%s-", StateChangeRule)))), + Pattern: Pattern{ + Source: []string{"aws.ec2"}, + DetailType: []string{"EC2 Instance State-change Notification"}, + }, + }, +} + +type Rule struct { + Name string + Pattern Pattern + Target Target +} + +const QueueTargetID = "KarpenterEventQueue" + +func (er Rule) addQueueTarget(queueARN string) Rule { + er.Target = Target{ + ID: QueueTargetID, + ARN: queueARN, + } + return er +} + +type Target struct { + ID string + ARN string +} + +type Pattern struct { + Source []string `json:"source,omitempty"` + DetailType []string `json:"detail-type,omitempty"` +} + +func (ep Pattern) Serialize() []byte { + return lo.Must(json.Marshal(ep)) +} + +type EventBridge struct { + client eventbridgeiface.EventBridgeAPI + sqsProvider *SQS +} + +func NewEventBridge(eb eventbridgeiface.EventBridgeAPI, sqsProvider *SQS) *EventBridge { + return &EventBridge{ + client: eb, + sqsProvider: sqsProvider, + } +} + +func (eb *EventBridge) CreateRules(ctx context.Context) error { + queueARN, err := eb.sqsProvider.queueARN.TryGet(ctx) + if err != nil { + return fmt.Errorf("resolving queue arn, %w", err) + } + existingRules, err := eb.DiscoverRules(ctx) + if err != nil { + return fmt.Errorf("discovering existing rules, %w", err) + } + rules := lo.MapToSlice(eb.mergeRules(existingRules), func(_ string, r Rule) Rule { + return r.addQueueTarget(queueARN) + }) + errs := make([]error, len(rules)) + workqueue.ParallelizeUntil(ctx, len(rules), len(rules), func(i int) { + _, err := eb.client.PutRuleWithContext(ctx, &eventbridge.PutRuleInput{ + Name: aws.String(rules[i].Name), + EventPattern: aws.String(string(rules[i].Pattern.Serialize())), + Tags: eb.getTags(ctx), + }) + if err != nil { + errs[i] = multierr.Append(errs[i], err) + } + _, err = eb.client.PutTargetsWithContext(ctx, &eventbridge.PutTargetsInput{ + Rule: aws.String(rules[i].Name), + Targets: []*eventbridge.Target{ + { + Id: aws.String(rules[i].Target.ID), + Arn: aws.String(rules[i].Target.ARN), + }, + }, + }) + if err != nil { + errs[i] = multierr.Append(errs[i], err) + } + }) + return multierr.Combine(errs...) +} + +func (eb *EventBridge) DiscoverRules(ctx context.Context) (map[string]Rule, error) { + m := map[string]Rule{} + output, err := eb.client.ListRulesWithContext(ctx, &eventbridge.ListRulesInput{ + NamePrefix: aws.String("Karpenter-"), + }) + if err != nil { + return nil, fmt.Errorf("listing rules, %w", err) + } + for _, rule := range output.Rules { + out, err := eb.client.ListTagsForResourceWithContext(ctx, &eventbridge.ListTagsForResourceInput{ + ResourceARN: rule.Arn, + }) + // If we get access denied, that means the tag-based policy didn't allow us to get the tags from the rule + // which means it isn't a rule that we created for this cluster anyways + if err != nil && !awserrors.IsAccessDenied(err) { + return nil, fmt.Errorf("describing rules, %w", err) + } + for _, tag := range out.Tags { + if aws.StringValue(tag.Key) == v1alpha5.DiscoveryTagKey && + aws.StringValue(tag.Value) == injection.GetOptions(ctx).ClusterName { + + // If we succeed to parse the rule name, we should store it by its rule type + t, err := parseRuleName(aws.StringValue(rule.Name)) + if err == nil { + m[t] = Rule{ + Name: aws.StringValue(rule.Name), + } + } + } + } + } + return m, nil +} + +func (eb *EventBridge) DeleteRules(ctx context.Context) error { + out, err := eb.DiscoverRules(ctx) + if err != nil { + return fmt.Errorf("discovering existing rules, %w", err) + } + rules := lo.Values(out) + errs := make([]error, len(rules)) + workqueue.ParallelizeUntil(ctx, len(rules), len(rules), func(i int) { + targetInput := &eventbridge.RemoveTargetsInput{ + Ids: []*string{aws.String(QueueTargetID)}, + Rule: aws.String(rules[i].Name), + } + _, err := eb.client.RemoveTargetsWithContext(ctx, targetInput) + if err != nil && !awserrors.IsNotFound(err) { + errs[i] = err + return + } + ruleInput := &eventbridge.DeleteRuleInput{ + Name: aws.String(rules[i].Name), + } + _, err = eb.client.DeleteRuleWithContext(ctx, ruleInput) + if err != nil && !awserrors.IsNotFound(err) { + errs[i] = err + } + }) + return multierr.Combine(errs...) +} + +// mergeRules merges the existing rules with the default rules based on the rule type +func (eb *EventBridge) mergeRules(existing map[string]Rule) map[string]Rule { + rules := lo.Assign(DefaultRules) + for k, rule := range rules { + if existingRule, ok := existing[k]; ok { + rule.Name = existingRule.Name + rules[k] = rule + } + } + return rules +} + +func (eb *EventBridge) getTags(ctx context.Context) []*eventbridge.Tag { + return append( + []*eventbridge.Tag{ + { + Key: aws.String(v1alpha5.DiscoveryTagKey), + Value: aws.String(injection.GetOptions(ctx).ClusterName), + }, + { + Key: aws.String(v1alpha5.ManagedByTagKey), + Value: aws.String(injection.GetOptions(ctx).ClusterName), + }, + }, + lo.MapToSlice(awssettings.FromContext(ctx).Tags, func(k, v string) *eventbridge.Tag { + return &eventbridge.Tag{ + Key: aws.String(k), + Value: aws.String(v), + } + })..., + ) +} + +// parseRuleName parses out the rule type based on the expected naming convention for rules +// provisioned by Karpenter +func parseRuleName(raw string) (string, error) { + r := regexp.MustCompile(`Karpenter-(?P.*)-.*`) + matches := r.FindStringSubmatch(raw) + if matches == nil { + return "", fmt.Errorf("parsing rule name, %s", raw) + } + for i, name := range r.SubexpNames() { + if name == "RuleType" { + return matches[i], nil + } + } + return "", fmt.Errorf("parsing rule name, %s", raw) +} diff --git a/pkg/controllers/providers/sqs.go b/pkg/controllers/providers/sqs.go new file mode 100644 index 000000000000..c22ae98afa65 --- /dev/null +++ b/pkg/controllers/providers/sqs.go @@ -0,0 +1,282 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/samber/lo" + + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter-core/pkg/operator/injection" + "github.com/aws/karpenter-core/pkg/utils/atomic" + awssettings "github.com/aws/karpenter/pkg/apis/config/settings" + awserrors "github.com/aws/karpenter/pkg/errors" +) + +type queuePolicy struct { + Version string `json:"Version"` + ID string `json:"Id"` + Statement []queuePolicyStatement `json:"Statement"` +} + +type queuePolicyStatement struct { + Effect string `json:"Effect"` + Principal principal `json:"Principal"` + Action []string `json:"Action"` + Resource string `json:"Resource"` +} + +type principal struct { + Service []string `json:"Service"` +} + +type SQS struct { + client sqsiface.SQSAPI + + queueURL atomic.Lazy[string] + queueARN atomic.Lazy[string] +} + +func NewSQS(client sqsiface.SQSAPI) *SQS { + provider := &SQS{ + client: client, + } + provider.queueURL.Resolve = func(ctx context.Context) (string, error) { + input := &sqs.GetQueueUrlInput{ + QueueName: aws.String(provider.QueueName(ctx)), + } + ret, err := provider.client.GetQueueUrlWithContext(ctx, input) + if err != nil { + return "", fmt.Errorf("fetching queue url, %w", err) + } + return aws.StringValue(ret.QueueUrl), nil + } + provider.queueARN.Resolve = func(ctx context.Context) (string, error) { + queueURL, err := provider.queueURL.TryGet(ctx) + if err != nil { + return "", fmt.Errorf("discovering queue url, %w", err) + } + input := &sqs.GetQueueAttributesInput{ + AttributeNames: aws.StringSlice([]string{sqs.QueueAttributeNameQueueArn}), + QueueUrl: aws.String(queueURL), + } + ret, err := provider.client.GetQueueAttributesWithContext(ctx, input) + if err != nil { + return "", fmt.Errorf("fetching queue arn, %w", err) + } + if arn, ok := ret.Attributes[sqs.QueueAttributeNameQueueArn]; ok { + return aws.StringValue(arn), nil + } + return "", fmt.Errorf("queue arn not found in queue attributes response") + } + return provider +} + +func (s *SQS) QueueName(ctx context.Context) string { + return lo.Substring(injection.GetOptions(ctx).ClusterName, 0, 80) +} + +func (s *SQS) CreateQueue(ctx context.Context) error { + input := &sqs.CreateQueueInput{ + QueueName: aws.String(s.QueueName(ctx)), + Tags: s.getTags(ctx), + } + result, err := s.client.CreateQueueWithContext(ctx, input) + if err != nil { + return fmt.Errorf("creating sqs queue, %w", err) + } + s.queueURL.Set(aws.StringValue(result.QueueUrl)) + return nil +} + +func (s *SQS) SetQueueAttributes(ctx context.Context, attributeOverrides map[string]*string) error { + queueURL, err := s.DiscoverQueueURL(ctx) + if err != nil { + return fmt.Errorf("fetching queue url, %w", err) + } + attributes, err := s.getQueueAttributes(ctx) + if err != nil { + return fmt.Errorf("marshaling queue attributes, %w", err) + } + if attributeOverrides != nil { + attributes = lo.Assign(attributes, attributeOverrides) + } + setQueueAttributesInput := &sqs.SetQueueAttributesInput{ + Attributes: attributes, + QueueUrl: aws.String(queueURL), + } + _, err = s.client.SetQueueAttributesWithContext(ctx, setQueueAttributesInput) + if err != nil { + return fmt.Errorf("setting queue attributes, %w", err) + } + return nil +} + +func (s *SQS) QueueExists(ctx context.Context) (bool, error) { + _, err := s.queueURL.TryGet(ctx, atomic.IgnoreCacheOption) + if err != nil { + if awserrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *SQS) DiscoverQueueURL(ctx context.Context) (string, error) { + return s.queueURL.TryGet(ctx) +} + +func (s *SQS) DiscoverQueueARN(ctx context.Context) (string, error) { + return s.queueARN.TryGet(ctx) +} + +func (s *SQS) GetSQSMessages(ctx context.Context) ([]*sqs.Message, error) { + queueURL, err := s.DiscoverQueueURL(ctx) + if err != nil { + return nil, fmt.Errorf("fetching queue url, %w", err) + } + + input := &sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(20), // Seconds + WaitTimeSeconds: aws.Int64(20), // Seconds, maximum for long polling + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: aws.String(queueURL), + } + + result, err := s.client.ReceiveMessageWithContext(ctx, input) + if err != nil { + return nil, fmt.Errorf("receiving sqs messages, %w", err) + } + + return result.Messages, nil +} + +func (s *SQS) SendMessage(ctx context.Context, body interface{}) (string, error) { + raw, err := json.Marshal(body) + if err != nil { + return "", fmt.Errorf("marshaling the passed body as json, %w", err) + } + queueURL, err := s.DiscoverQueueURL(ctx) + if err != nil { + return "", fmt.Errorf("fetching queue url, %w", err) + } + input := &sqs.SendMessageInput{ + MessageBody: aws.String(string(raw)), + QueueUrl: aws.String(queueURL), + } + result, err := s.client.SendMessage(input) + if err != nil { + return "", fmt.Errorf("sending messages to sqs queue, %w", err) + } + return aws.StringValue(result.MessageId), nil +} + +func (s *SQS) DeleteSQSMessage(ctx context.Context, msg *sqs.Message) error { + queueURL, err := s.DiscoverQueueURL(ctx) + if err != nil { + return fmt.Errorf("failed fetching queue url, %w", err) + } + + input := &sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + } + + _, err = s.client.DeleteMessageWithContext(ctx, input) + if err != nil { + return fmt.Errorf("deleting messages from sqs queue, %w", err) + } + return nil +} + +func (s *SQS) DeleteQueue(ctx context.Context) error { + queueURL, err := s.DiscoverQueueURL(ctx) + if err != nil { + if awserrors.IsNotFound(err) || awserrors.IsAccessDenied(err) { + return nil + } + return fmt.Errorf("fetching queue url, %w", err) + } + + input := &sqs.DeleteQueueInput{ + QueueUrl: aws.String(queueURL), + } + _, err = s.client.DeleteQueueWithContext(ctx, input) + if err != nil && !awserrors.IsNotFound(err) { + return fmt.Errorf("deleting sqs queue, %w", err) + } + return nil +} + +func (s *SQS) getQueueAttributes(ctx context.Context) (map[string]*string, error) { + raw, err := s.getQueuePolicy(ctx) + if err != nil { + return nil, fmt.Errorf("marshaling queue policy, %w", err) + } + policy := lo.Must(json.Marshal(raw)) + return map[string]*string{ + sqs.QueueAttributeNameMessageRetentionPeriod: aws.String("300"), + sqs.QueueAttributeNamePolicy: aws.String(string(policy)), + }, nil +} + +func (s *SQS) getQueuePolicy(ctx context.Context) (*queuePolicy, error) { + queueARN, err := s.DiscoverQueueARN(ctx) + if err != nil { + return nil, fmt.Errorf("retrieving queue arn for queue policy, %w", err) + } + return &queuePolicy{ + Version: "2008-10-17", + ID: "EC2NotificationPolicy", + Statement: []queuePolicyStatement{ + { + Effect: "Allow", + Principal: principal{ + Service: []string{ + "events.amazonaws.com", + "sqs.amazonaws.com", + }, + }, + Action: []string{"sqs:SendMessage"}, + Resource: queueARN, + }, + }, + }, nil +} + +func (s *SQS) getTags(ctx context.Context) map[string]*string { + return lo.Assign( + lo.MapEntries(awssettings.FromContext(ctx).Tags, func(k, v string) (string, *string) { + return k, lo.ToPtr(v) + }), + map[string]*string{ + v1alpha5.DiscoveryTagKey: aws.String(injection.GetOptions(ctx).ClusterName), + v1alpha5.ManagedByTagKey: aws.String(injection.GetOptions(ctx).ClusterName), + }, + ) +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 3d1d7df113ad..79c891c4defc 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -19,27 +19,37 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" - "github.com/samber/lo" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/sqs" + "k8s.io/apimachinery/pkg/util/sets" ) const ( launchTemplateNotFoundCode = "InvalidLaunchTemplateName.NotFoundException" + AccessDeniedCode = "AccessDenied" + AccessDeniedExceptionCode = "AccessDeniedException" ) var ( // This is not an exhaustive list, add to it as needed - notFoundErrorCodes = []string{ + notFoundErrorCodes = sets.NewString( "InvalidInstanceID.NotFound", launchTemplateNotFoundCode, - } + sqs.ErrCodeQueueDoesNotExist, + (&eventbridge.ResourceNotFoundException{}).Code(), + ) // unfulfillableCapacityErrorCodes signify that capacity is temporarily unable to be launched - unfulfillableCapacityErrorCodes = []string{ + unfulfillableCapacityErrorCodes = sets.NewString( "InsufficientInstanceCapacity", "MaxSpotInstanceCountExceeded", "VcpuLimitExceeded", "UnfulfillableCapacity", "Unsupported", - } + ) + accessDeniedErrorCodes = sets.NewString( + AccessDeniedCode, + AccessDeniedExceptionCode, + ) ) type InstanceTerminatedError struct { @@ -67,7 +77,21 @@ func IsNotFound(err error) bool { } var awsError awserr.Error if errors.As(err, &awsError) { - return lo.Contains(notFoundErrorCodes, awsError.Code()) + return notFoundErrorCodes.Has(awsError.Code()) + } + return false +} + +// IsAccessDenied returns true if the error is an AWS error (even if it's +// wrapped) and is a known to mean "access denied" (as opposed to a more +// serious or unexpected error) +func IsAccessDenied(err error) bool { + if err == nil { + return false + } + var awsError awserr.Error + if errors.As(err, &awsError) { + return accessDeniedErrorCodes.Has(awsError.Code()) } return false } @@ -76,7 +100,7 @@ func IsNotFound(err error) bool { // capacity is temporarily unavailable for launching. // This could be due to account limits, insufficient ec2 capacity, etc. func IsUnfulfillableCapacity(err *ec2.CreateFleetError) bool { - return lo.Contains(unfulfillableCapacityErrorCodes, *err.ErrorCode) + return unfulfillableCapacityErrorCodes.Has(*err.ErrorCode) } func IsLaunchTemplateNotFound(err error) bool { diff --git a/pkg/fake/atomic.go b/pkg/fake/atomic.go index e5958a3e46ad..685496759b91 100644 --- a/pkg/fake/atomic.go +++ b/pkg/fake/atomic.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/json" "log" + "math" "sync" ) @@ -71,12 +72,17 @@ func (a *AtomicPtr[T]) Reset() { type AtomicError struct { mu sync.Mutex err error + + calls int + maxCalls int } func (e *AtomicError) Reset() { e.mu.Lock() defer e.mu.Unlock() e.err = nil + e.calls = 0 + e.maxCalls = 0 } func (e *AtomicError) IsNil() bool { @@ -85,16 +91,40 @@ func (e *AtomicError) IsNil() bool { return e.err == nil } +// Get is equivalent to the error being called, so we increase +// number of calls in this function func (e *AtomicError) Get() error { e.mu.Lock() defer e.mu.Unlock() + if e.calls >= e.maxCalls { + return nil + } + e.calls++ return e.err } -func (e *AtomicError) Set(err error) { +func (e *AtomicError) Set(err error, opts ...AtomicErrorOption) { e.mu.Lock() defer e.mu.Unlock() e.err = err + for _, opt := range opts { + opt(e) + } + if e.maxCalls == 0 { + e.maxCalls = 1 + } +} + +type AtomicErrorOption func(atomicError *AtomicError) + +func MaxCalls(maxCalls int) AtomicErrorOption { + // Setting to 0 is equivalent to allowing infinite errors to API + if maxCalls <= 0 { + maxCalls = math.MaxInt + } + return func(e *AtomicError) { + e.maxCalls = maxCalls + } } // AtomicPtrSlice exposes a slice of a pointer type in a race-free manner. The interface is just enough to replace the diff --git a/pkg/fake/eventbridgeapi.go b/pkg/fake/eventbridgeapi.go new file mode 100644 index 000000000000..c225b91d1b66 --- /dev/null +++ b/pkg/fake/eventbridgeapi.go @@ -0,0 +1,75 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "context" + + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/eventbridge" + "github.com/aws/aws-sdk-go/service/eventbridge/eventbridgeiface" +) + +// EventBridgeBehavior must be reset between tests otherwise tests will +// pollute each other. +type EventBridgeBehavior struct { + PutRuleBehavior MockedFunction[eventbridge.PutRuleInput, eventbridge.PutRuleOutput] + PutTargetsBehavior MockedFunction[eventbridge.PutTargetsInput, eventbridge.PutTargetsOutput] + ListRulesBehavior MockedFunction[eventbridge.ListRulesInput, eventbridge.ListRulesOutput] + ListTagsForResourceBehavior MockedFunction[eventbridge.ListTagsForResourceInput, eventbridge.ListTagsForResourceOutput] + DeleteRuleBehavior MockedFunction[eventbridge.DeleteRuleInput, eventbridge.DeleteRuleOutput] + RemoveTargetsBehavior MockedFunction[eventbridge.RemoveTargetsInput, eventbridge.RemoveTargetsOutput] +} + +type EventBridgeAPI struct { + eventbridgeiface.EventBridgeAPI + EventBridgeBehavior +} + +// Reset must be called between tests otherwise tests will pollute +// each other. +func (eb *EventBridgeAPI) Reset() { + eb.PutRuleBehavior.Reset() + eb.PutTargetsBehavior.Reset() + eb.ListRulesBehavior.Reset() + eb.DeleteRuleBehavior.Reset() + eb.RemoveTargetsBehavior.Reset() +} + +// TODO: Create a dummy rule ARN for the default that is returned from this function +func (eb *EventBridgeAPI) PutRuleWithContext(_ context.Context, input *eventbridge.PutRuleInput, _ ...request.Option) (*eventbridge.PutRuleOutput, error) { + return eb.PutRuleBehavior.Invoke(input) +} + +// TODO: Create a default response that returns failed entries +func (eb *EventBridgeAPI) PutTargetsWithContext(_ context.Context, input *eventbridge.PutTargetsInput, _ ...request.Option) (*eventbridge.PutTargetsOutput, error) { + return eb.PutTargetsBehavior.Invoke(input) +} + +func (eb *EventBridgeAPI) ListRulesWithContext(_ context.Context, input *eventbridge.ListRulesInput, _ ...request.Option) (*eventbridge.ListRulesOutput, error) { + return eb.ListRulesBehavior.Invoke(input) +} + +func (eb *EventBridgeAPI) ListTagsForResourceWithContext(_ context.Context, input *eventbridge.ListTagsForResourceInput, _ ...request.Option) (*eventbridge.ListTagsForResourceOutput, error) { + return eb.ListTagsForResourceBehavior.Invoke(input) +} + +func (eb *EventBridgeAPI) DeleteRuleWithContext(_ context.Context, input *eventbridge.DeleteRuleInput, _ ...request.Option) (*eventbridge.DeleteRuleOutput, error) { + return eb.DeleteRuleBehavior.Invoke(input) +} + +func (eb *EventBridgeAPI) RemoveTargetsWithContext(_ context.Context, input *eventbridge.RemoveTargetsInput, _ ...request.Option) (*eventbridge.RemoveTargetsOutput, error) { + return eb.RemoveTargetsBehavior.Invoke(input) +} diff --git a/pkg/fake/sqsapi.go b/pkg/fake/sqsapi.go new file mode 100644 index 000000000000..60ef785dc62e --- /dev/null +++ b/pkg/fake/sqsapi.go @@ -0,0 +1,94 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "context" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs/sqsiface" +) + +const ( + dummyQueueURL = "https://sqs.us-west-2.amazonaws.com/000000000000/Karpenter-cluster-Queue" +) + +// SQSBehavior must be reset between tests otherwise tests will +// pollute each other. +type SQSBehavior struct { + CreateQueueBehavior MockedFunction[sqs.CreateQueueInput, sqs.CreateQueueOutput] + GetQueueURLBehavior MockedFunction[sqs.GetQueueUrlInput, sqs.GetQueueUrlOutput] + GetQueueAttributesBehavior MockedFunction[sqs.GetQueueAttributesInput, sqs.GetQueueAttributesOutput] + SetQueueAttributesBehavior MockedFunction[sqs.SetQueueAttributesInput, sqs.SetQueueAttributesOutput] + ReceiveMessageBehavior MockedFunction[sqs.ReceiveMessageInput, sqs.ReceiveMessageOutput] + DeleteMessageBehavior MockedFunction[sqs.DeleteMessageInput, sqs.DeleteMessageOutput] + DeleteQueueBehavior MockedFunction[sqs.DeleteQueueInput, sqs.DeleteQueueOutput] +} + +type SQSAPI struct { + sqsiface.SQSAPI + SQSBehavior +} + +// Reset must be called between tests otherwise tests will pollute +// each other. +func (s *SQSAPI) Reset() { + s.CreateQueueBehavior.Reset() + s.GetQueueURLBehavior.Reset() + s.GetQueueAttributesBehavior.Reset() + s.SetQueueAttributesBehavior.Reset() + s.ReceiveMessageBehavior.Reset() + s.DeleteMessageBehavior.Reset() + s.DeleteQueueBehavior.Reset() +} + +func (s *SQSAPI) CreateQueueWithContext(_ context.Context, input *sqs.CreateQueueInput, _ ...request.Option) (*sqs.CreateQueueOutput, error) { + return s.CreateQueueBehavior.WithDefault(&sqs.CreateQueueOutput{ + QueueUrl: aws.String(dummyQueueURL), + }).Invoke(input) +} + +//nolint:revive,stylecheck +func (s *SQSAPI) GetQueueUrlWithContext(_ context.Context, input *sqs.GetQueueUrlInput, _ ...request.Option) (*sqs.GetQueueUrlOutput, error) { + return s.GetQueueURLBehavior.WithDefault(&sqs.GetQueueUrlOutput{ + QueueUrl: aws.String(dummyQueueURL), + }).Invoke(input) +} + +func (s *SQSAPI) GetQueueAttributesWithContext(_ context.Context, input *sqs.GetQueueAttributesInput, _ ...request.Option) (*sqs.GetQueueAttributesOutput, error) { + return s.GetQueueAttributesBehavior.WithDefault(&sqs.GetQueueAttributesOutput{ + Attributes: map[string]*string{ + sqs.QueueAttributeNameQueueArn: aws.String("arn:aws:sqs:us-west-2:000000000000:Karpenter-Queue"), + }, + }).Invoke(input) +} + +func (s *SQSAPI) SetQueueAttributesWithContext(_ context.Context, input *sqs.SetQueueAttributesInput, _ ...request.Option) (*sqs.SetQueueAttributesOutput, error) { + return s.SetQueueAttributesBehavior.Invoke(input) +} + +func (s *SQSAPI) ReceiveMessageWithContext(_ context.Context, input *sqs.ReceiveMessageInput, _ ...request.Option) (*sqs.ReceiveMessageOutput, error) { + return s.ReceiveMessageBehavior.Invoke(input) +} + +func (s *SQSAPI) DeleteMessageWithContext(_ context.Context, input *sqs.DeleteMessageInput, _ ...request.Option) (*sqs.DeleteMessageOutput, error) { + return s.DeleteMessageBehavior.Invoke(input) +} + +func (s *SQSAPI) DeleteQueueWithContext(_ context.Context, input *sqs.DeleteQueueInput, _ ...request.Option) (*sqs.DeleteQueueOutput, error) { + return s.DeleteQueueBehavior.Invoke(input) +} diff --git a/pkg/fake/types.go b/pkg/fake/types.go new file mode 100644 index 000000000000..83a8b3e5bfe8 --- /dev/null +++ b/pkg/fake/types.go @@ -0,0 +1,76 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "sync/atomic" +) + +type MockedFunction[I any, O any] struct { + Output AtomicPtr[O] // Output to return on call to this function + CalledWithInput AtomicPtrSlice[I] // Slice used to keep track of passed input to this function + Error AtomicError // Error to return a certain number of times defined by custom error options + + defaultOutput AtomicPtr[O] // Default output stores the default output if Output isn't set + successfulCalls atomic.Int32 // Internal construct to keep track of the number of times this function has successfully been called + failedCalls atomic.Int32 // Internal construct to keep track of the number of times this function has failed (with error) +} + +// Reset must be called between tests otherwise tests will pollute +// each other. +func (m *MockedFunction[I, O]) Reset() { + m.Output.Reset() + m.CalledWithInput.Reset() + m.Error.Reset() + + m.defaultOutput.Reset() + m.successfulCalls.Store(0) + m.failedCalls.Store(0) +} + +func (m *MockedFunction[I, O]) WithDefault(output *O) *MockedFunction[I, O] { + m.defaultOutput.Set(output) + return m +} + +func (m *MockedFunction[I, O]) Invoke(input *I) (*O, error) { + err := m.Error.Get() + if err != nil { + m.failedCalls.Add(1) + return nil, err + } + m.CalledWithInput.Add(input) + m.successfulCalls.Add(1) + + if !m.Output.IsNil() { + return m.Output.Clone(), nil + } + if !m.defaultOutput.IsNil() { + return m.defaultOutput.Clone(), nil + } + return new(O), nil +} + +func (m *MockedFunction[I, O]) Calls() int { + return m.SuccessfulCalls() + m.FailedCalls() +} + +func (m *MockedFunction[I, O]) SuccessfulCalls() int { + return int(m.successfulCalls.Load()) +} + +func (m *MockedFunction[I, O]) FailedCalls() int { + return int(m.failedCalls.Load()) +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 1e1bacadd8e4..34c7d1bf59a0 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -15,16 +15,11 @@ limitations under the License. package utils import ( - "context" - "crypto/sha256" - "encoding/hex" "fmt" "regexp" v1 "k8s.io/api/core/v1" "knative.dev/pkg/ptr" - - "github.com/aws/karpenter-core/pkg/operator/injection" ) // ParseInstanceID parses the provider ID stored on the node to get the instance ID @@ -42,11 +37,3 @@ func ParseInstanceID(node *v1.Node) (*string, error) { } return nil, fmt.Errorf("parsing instance id %s", node.Spec.ProviderID) } - -// GetClusterNameHash gets the SHA256 hex-encoded checksum of the cluster name, truncated at the passed truncatedAt -func GetClusterNameHash(ctx context.Context, truncateAt int) string { - h := sha256.New() - h.Write([]byte(injection.GetOptions(ctx).ClusterName)) - checkSum := h.Sum([]byte{}) - return hex.EncodeToString(checkSum)[:truncateAt] -} diff --git a/test/go.mod b/test/go.mod index 599f2fb73b1c..a416ce570278 100644 --- a/test/go.mod +++ b/test/go.mod @@ -3,12 +3,15 @@ module github.com/aws/karpenter/test go 1.19 require ( + github.com/aws/amazon-ec2-spot-interrupter v0.0.9 github.com/aws/aws-sdk-go v1.44.127 + github.com/aws/aws-sdk-go-v2/config v1.17.10 github.com/aws/karpenter v0.18.0 github.com/aws/karpenter-core v0.0.2-0.20221102174542-079bcf63322e github.com/onsi/ginkgo/v2 v2.4.0 github.com/onsi/gomega v1.24.0 github.com/samber/lo v1.33.0 + go.uber.org/multierr v1.8.0 k8s.io/api v0.25.2 k8s.io/apimachinery v0.25.2 k8s.io/client-go v0.25.2 @@ -20,6 +23,20 @@ require ( github.com/Pallinder/go-randomdata v1.2.0 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/aws/aws-sdk-go-v2 v1.17.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.12.23 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/service/ec2 v1.37.0 // indirect + github.com/aws/aws-sdk-go-v2/service/fis v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/iam v1.18.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.11.25 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.17.1 // indirect + github.com/aws/smithy-go v1.13.4 // indirect github.com/benbjohnson/clock v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blendle/zapdriver v1.3.1 // indirect @@ -61,7 +78,6 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect diff --git a/test/go.sum b/test/go.sum index efc68c35795c..03a8df3e5070 100644 --- a/test/go.sum +++ b/test/go.sum @@ -44,10 +44,50 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/aws/amazon-ec2-spot-interrupter v0.0.9 h1:2yRK7f29tPhrkFBn1lg6QZNjY3iE1ovsjom99OnBCDw= +github.com/aws/amazon-ec2-spot-interrupter v0.0.9/go.mod h1:TwqwmD9RUPwjjcyklxGlzxIxbA6oRfDn6lQf0Muu8/A= github.com/aws/aws-sdk-go v1.44.127 h1:IoO2VfuIQg1aMXnl8l6OpNUKT4Qq5CnJMOyIWoTYXj0= github.com/aws/aws-sdk-go v1.44.127/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= +github.com/aws/aws-sdk-go-v2 v1.16.3/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= +github.com/aws/aws-sdk-go-v2 v1.17.1 h1:02c72fDJr87N8RAC2s3Qu0YuvMRZKNZJ9F+lAehCazk= +github.com/aws/aws-sdk-go-v2 v1.17.1/go.mod h1:JLnGeGONAyi2lWXI1p0PCIOIy333JMVK1U7Hf0aRFLw= +github.com/aws/aws-sdk-go-v2/config v1.17.10 h1:zBy5QQ/mkvHElM1rygHPAzuH+sl8nsdSaxSWj0+rpdE= +github.com/aws/aws-sdk-go-v2/config v1.17.10/go.mod h1:/4np+UiJJKpWHN7Q+LZvqXYgyjgeXm5+lLfDI6TPZao= +github.com/aws/aws-sdk-go-v2/credentials v1.12.23 h1:LctvcJMIb8pxvk5hQhChpCu0WlU6oKQmcYb1HA4IZSA= +github.com/aws/aws-sdk-go-v2/credentials v1.12.23/go.mod h1:0awX9iRr/+UO7OwRQFpV1hNtXxOVuehpjVEzrIAYNcA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 h1:E3PXZSI3F2bzyj6XxUXdTIfvp425HHhwKsFvmzBwHgs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19/go.mod h1:VihW95zQpeKQWVPGkwT+2+WJNQV8UXFfMTWdU6VErL8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9/go.mod h1:AnVH5pvai0pAF4lXRq0bmhbes1u9R8wTE+g+183bZNM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10/go.mod h1:F+EZtuIwjlv35kRJPyBGcsA4f7bnSoz15zOQ2lJq1Z4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25 h1:nBO/RFxeq/IS5G9Of+ZrgucRciie2qpLy++3UGZ+q2E= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3/go.mod h1:ssOhaLpRlh88H3UmEcsBoVKq309quMvm3Ds8e9d4eJM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4/go.mod h1:8glyUqVIM4AmeenIsPo0oVh3+NUwnsQml2OFupfQW+0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19 h1:oRHDrwCTVT8ZXi4sr9Ld+EXk7N/KGssOr2ygNeojEhw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19/go.mod h1:6Q0546uHDp421okhmmGfbxzq2hBqbXFNpi4k+Q1JnQA= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26 h1:Mza+vlnZr+fPKFKRq/lKGVvM6B/8ZZmNdEopOwSQLms= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26/go.mod h1:Y2OJ+P+MC1u1VKnavT+PshiEuGPyh/7DqxoDNij4/bg= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.37.0 h1:zvVR76AXaNElDx6BwOjcxrk4cffFVxx0shQe8yRg2V8= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.37.0/go.mod h1:KOy1O7Fc2+GRgsbn/Kjr15vYDVXMEQALBaPRia3twSY= +github.com/aws/aws-sdk-go-v2/service/fis v1.12.3 h1:jOr6HpAfzh8Pk/ji0QstROuOIk4vFagtm3eDLW2Dkm4= +github.com/aws/aws-sdk-go-v2/service/fis v1.12.3/go.mod h1:qOT644wBvlD/dOHzCUJqvgAqR3UAqV3Almli1PrUrCg= +github.com/aws/aws-sdk-go-v2/service/iam v1.18.3 h1:wllKL2fLtvfaNAVbXKMRmM/mD1oDNw0hXmDn8mE/6Us= +github.com/aws/aws-sdk-go-v2/service/iam v1.18.3/go.mod h1:51xGfEjd1HXnTzw2mAp++qkRo+NyGYblZkuGTsb49yw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.4/go.mod h1:uKkN7qmSIsNJVyMtxNQoCEYMvFEXbOg9fwCJPdfp2u8= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19 h1:GE25AWCdNUPh9AOJzI9KIJnja7IwUc1WyUqz/JTyJ/I= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19/go.mod h1:02CP6iuYP+IVnBX5HULVdSAku/85eHB2Y9EsFhrkEwU= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.25 h1:GFZitO48N/7EsFDt8fMa5iYdmWqkUDDB3Eje6z3kbG0= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.25/go.mod h1:IARHuzTXmj1C0KS35vboR0FeJ89OkEy1M9mWbK2ifCI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 h1:jcw6kKZrtNfBPJkaHrscDOZoe5gvi9wjudnxvozYFJo= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8/go.mod h1:er2JHN+kBY6FcMfcBBKNGCT3CarImmdFzishsqBmSRI= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.1 h1:KRAix/KHvjGODaHAMXnxRk9t0D+4IJVUuS/uwXxngXk= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.1/go.mod h1:bXcN3koeVYiJcdDU89n3kCYILob7Y34AeLopUbZgLT4= github.com/aws/karpenter-core v0.0.2-0.20221102174542-079bcf63322e h1:bmYEH+y4A3g0jQPOSBt1Z1PSMSX+LaCxdRIALyoMCy0= github.com/aws/karpenter-core v0.0.2-0.20221102174542-079bcf63322e/go.mod h1:LqX9wVsuF9sN/yd18TtMAMHmulc+chsA5AsbtuwVTUI= +github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= +github.com/aws/smithy-go v1.13.4 h1:/RN2z1txIJWeXeOkzX+Hk/4Uuvv7dWtCjbmVJcrskyk= +github.com/aws/smithy-go v1.13.4/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -161,6 +201,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/test/infrastructure/clusters/test-infra/karpenter-tests/pipeline-trigger-cron.yaml b/test/infrastructure/clusters/test-infra/karpenter-tests/pipeline-trigger-cron.yaml index f0e7a48e56e5..1161efd6ca1c 100644 --- a/test/infrastructure/clusters/test-infra/karpenter-tests/pipeline-trigger-cron.yaml +++ b/test/infrastructure/clusters/test-infra/karpenter-tests/pipeline-trigger-cron.yaml @@ -37,7 +37,7 @@ data: pipelines-trigger.sh: |+ #!/usr/bin/env bash set -euo pipefail - for suite in "Integration" "Consolidation" "Utilization"; do + for suite in "Integration" "Consolidation" "Utilization" "Interruption"; do cat <.*)/(?P.*)`) + matches := r.FindStringSubmatch(pid) + if matches == nil { + return "" + } + for i, name := range r.SubexpNames() { + if name == "InstanceID" { + return matches[i] + } + } + return "" +} diff --git a/website/content/en/preview/getting-started/getting-started-with-eksctl/cloudformation.yaml b/website/content/en/preview/getting-started/getting-started-with-eksctl/cloudformation.yaml index c091c5c2d807..dab54b975386 100644 --- a/website/content/en/preview/getting-started/getting-started-with-eksctl/cloudformation.yaml +++ b/website/content/en/preview/getting-started/getting-started-with-eksctl/cloudformation.yaml @@ -64,3 +64,37 @@ Resources: Action: - iam:PassRole Resource: !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/KarpenterNodeRole-${ClusterName}" + KarpenterEventPolicy: + Type: AWS::IAM::ManagedPolicy + Properties: + ManagedPolicyName: !Sub "KarpenterEventPolicy-${ClusterName}" + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Resource: !Sub "arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${ClusterName}" + Action: + # Write Operations + - sqs:CreateQueue + - sqs:TagQueue + - sqs:SetQueueAttributes + - sqs:DeleteQueue + - sqs:DeleteMessage + # Read Operations + - sqs:GetQueueUrl + - sqs:GetQueueAttributes + - sqs:ReceiveMessage + - Effect: Allow + Resource: !Sub "arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/Karpenter-*" + Action: + # Write Operations + - events:PutRule + - events:TagResource + - events:PutTargets + - events:DeleteRule + - events:RemoveTargets + - events:ListTagsForResource + - Effect: Allow + Resource: "*" + Action: + - events:ListRules \ No newline at end of file diff --git a/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step03-iam-cloud-formation.sh b/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step03-iam-cloud-formation.sh index dce544c1a557..e07c4220ae38 100755 --- a/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step03-iam-cloud-formation.sh +++ b/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step03-iam-cloud-formation.sh @@ -5,4 +5,4 @@ curl -fsSL https://karpenter.sh/"${KARPENTER_VERSION}"/getting-started/getting-s --stack-name "Karpenter-${CLUSTER_NAME}" \ --template-file "${TEMPOUT}" \ --capabilities CAPABILITY_NAMED_IAM \ - --parameter-overrides "ClusterName=${CLUSTER_NAME}" + --parameter-overrides "ClusterName=${CLUSTER_NAME}" \ No newline at end of file diff --git a/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step05-controller-iam.sh b/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step05-controller-iam.sh index 32673a49c9e0..0940503acf26 100755 --- a/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step05-controller-iam.sh +++ b/website/content/en/preview/getting-started/getting-started-with-eksctl/scripts/step05-controller-iam.sh @@ -2,6 +2,7 @@ eksctl create iamserviceaccount \ --cluster "${CLUSTER_NAME}" --name karpenter --namespace karpenter \ --role-name "${CLUSTER_NAME}-karpenter" \ --attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KarpenterControllerPolicy-${CLUSTER_NAME}" \ + --attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KarpenterEventPolicy-${CLUSTER_NAME}" \ --role-only \ --approve