From 85be7abb798dc246fea8e362f2df1404da71ecb7 Mon Sep 17 00:00:00 2001 From: Saurav Agarwalla Date: Sat, 21 May 2022 04:30:54 +0000 Subject: [PATCH] [FIXES] [Tagging Controller] Fix issues in tagging controller * Don't tag Fargate nodes * Label nodes to prevent retagging them again * Increase bucket size for latency metrics --- pkg/controllers/tagging/metrics.go | 1 + pkg/controllers/tagging/tagging_controller.go | 95 ++++++++++++++++--- .../tagging/tagging_controller_test.go | 52 +++++++++- pkg/providers/v1/aws.go | 14 +-- pkg/providers/v1/instances.go | 2 +- 5 files changed, 141 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/tagging/metrics.go b/pkg/controllers/tagging/metrics.go index 5a263086f2..6ac3a9db2f 100644 --- a/pkg/controllers/tagging/metrics.go +++ b/pkg/controllers/tagging/metrics.go @@ -27,6 +27,7 @@ var ( Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds", Help: "workitem latency of workitem being in the queue and time it takes to process", StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20), }, []string{"latency_type"}) diff --git a/pkg/controllers/tagging/tagging_controller.go b/pkg/controllers/tagging/tagging_controller.go index 9e56cb6337..d20a9defbb 100644 --- a/pkg/controllers/tagging/tagging_controller.go +++ b/pkg/controllers/tagging/tagging_controller.go @@ -14,6 +14,7 @@ limitations under the License. package tagging import ( + "crypto/md5" "fmt" v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -25,7 +26,10 @@ import ( cloudprovider "k8s.io/cloud-provider" opt "k8s.io/cloud-provider-aws/pkg/controllers/options" awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + nodehelpers "k8s.io/cloud-provider/node/helpers" "k8s.io/klog/v2" + "sort" + "strings" "time" ) @@ -37,7 +41,13 @@ type workItem struct { enqueueTime time.Time } +func (w workItem) String() string { + return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime) +} + const ( + taggingControllerLabelKey = "k8s.io/cloud-provider-aws" + maxRequeuingCount = 9 // The label for depicting total number of errors a work item encounter and succeed @@ -105,9 +115,27 @@ func NewTaggingController( // Use shared informer to listen to add/update/delete of nodes. Note that any nodes // that exist before tagging controller starts will show up in the update method tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.tagNodesResources) }, - UpdateFunc: func(oldObj, newObj interface{}) { tc.enqueueNode(newObj, tc.tagNodesResources) }, - DeleteFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.untagNodeResources) }, + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + tc.enqueueNode(node, tc.tagNodesResources) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + node := newObj.(*v1.Node) + // Check if tagging is required by inspecting the labels. This check here prevents us from putting a tagged node into the + // work queue. We check this again before tagging the node to make sure that between when a node was put in the work queue + // and when it gets tagged, there might be another event which put the same item in the work queue + // (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call. + if !tc.isTaggingRequired(node) { + klog.Infof("Skip putting node %s in work queue since it was already tagged earlier.", node.GetName()) + return + } + + tc.enqueueNode(node, tc.tagNodesResources) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + tc.enqueueNode(node, tc.untagNodeResources) + }, }) return tc, nil @@ -147,7 +175,7 @@ func (tc *Controller) process() bool { return false } - klog.Infof("Starting to process %v", obj) + klog.Infof("Starting to process %s", obj) err := func(obj interface{}) error { defer tc.workqueue.Done(obj) @@ -155,13 +183,14 @@ func (tc *Controller) process() bool { workItem, ok := obj.(*workItem) if !ok { tc.workqueue.Forget(obj) - err := fmt.Errorf("expected workItem in workqueue but got %#v", obj) + err := fmt.Errorf("expected workItem in workqueue but got %s", obj) utilruntime.HandleError(err) return nil } timeTaken := time.Since(workItem.enqueueTime).Seconds() recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken) + klog.Infof("Dequeuing latency %s", timeTaken) instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID() if err != nil { @@ -169,6 +198,13 @@ func (tc *Controller) process() bool { utilruntime.HandleError(err) return nil } + klog.Infof("Instance ID of work item %s is %s", workItem, instanceID) + + if awsv1.IsFargateNode(string(instanceID)) { + klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID) + tc.workqueue.Forget(obj) + return nil + } err = workItem.action(workItem.node) @@ -182,12 +218,13 @@ func (tc *Controller) process() bool { return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount) } - klog.Errorf("error processing work item '%v': %s, requeuing count exceeded", workItem, err.Error()) + klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error()) recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID)) } else { - klog.Infof("Finished processing %v", workItem) + klog.Infof("Finished processing %s", workItem) timeTaken = time.Since(workItem.enqueueTime).Seconds() recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken) + klog.Infof("Processing latency %s", timeTaken) } tc.workqueue.Forget(obj) @@ -195,7 +232,7 @@ func (tc *Controller) process() bool { }(obj) if err != nil { - klog.Errorf("Error occurred while processing %v", obj) + klog.Errorf("Error occurred while processing %s", obj) utilruntime.HandleError(err) } @@ -221,16 +258,28 @@ func (tc *Controller) tagNodesResources(node *v1.Node) error { // tagEc2Instances applies the provided tags to each EC2 instance in // the cluster. func (tc *Controller) tagEc2Instance(node *v1.Node) error { + if !tc.isTaggingRequired(node) { + klog.Infof("Skip tagging node %s since it was already tagged earlier.", node.GetName()) + return nil + } + instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() err := tc.cloud.TagResource(string(instanceID), tc.tags) if err != nil { - klog.Errorf("Error in tagging EC2 instance for node %s, error: %v", node.GetName(), err) + klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) return err } - klog.Infof("Successfully tagged %s with %v", instanceID, tc.tags) + labels := map[string]string{taggingControllerLabelKey: tc.getChecksumOfTags()} + klog.Infof("Successfully tagged %s with %v. Labeling the nodes with tagging controller labels now.", instanceID, tc.tags) + if !nodehelpers.AddOrUpdateLabelsOnNode(tc.kubeClient, labels, node) { + klog.Errorf("Couldn't apply labels %s to node %s.", labels, node.GetName()) + return fmt.Errorf("couldn't apply labels %s to node %s", labels, node.GetName()) + } + + klog.Infof("Successfully labeled node %s with %v.", node.GetName(), labels) return nil } @@ -259,7 +308,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error { err := tc.cloud.UntagResource(string(instanceID), tc.tags) if err != nil { - klog.Errorf("Error in untagging EC2 instance for node %s, error: %v", node.GetName(), err) + klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) return err } @@ -270,8 +319,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error { // enqueueNode takes in the object and an // action for the object for a workitem and enqueue to the workqueue -func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) error) { - node := obj.(*v1.Node) +func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) { item := &workItem{ node: node, action: action, @@ -281,3 +329,24 @@ func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) er tc.workqueue.Add(item) klog.Infof("Added %s to the workqueue", item) } + +func (tc *Controller) isTaggingRequired(node *v1.Node) bool { + if node.Labels == nil { + return true + } + + if labelValue, ok := node.Labels[taggingControllerLabelKey]; !ok || labelValue != tc.getChecksumOfTags() { + return true + } + + return false +} + +func (tc *Controller) getChecksumOfTags() string { + tags := []string{} + for key, value := range tc.tags { + tags = append(tags, key+"="+value) + } + sort.Strings(tags) + return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ",")))) +} diff --git a/pkg/controllers/tagging/tagging_controller_test.go b/pkg/controllers/tagging/tagging_controller_test.go index abee028e46..29ff076635 100644 --- a/pkg/controllers/tagging/tagging_controller_test.go +++ b/pkg/controllers/tagging/tagging_controller_test.go @@ -69,6 +69,54 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { toBeTagged: true, expectedMessages: []string{"Successfully tagged i-0001"}, }, + { + name: "node0 joins the cluster and was tagged earlier with different tags.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + taggingControllerLabelKey: "9767c4972ba72e87ab553bad2afde741", // MD5 for key1=value1 + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0001", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Successfully tagged i-0001"}, + }, + { + name: "node0 joins the cluster but isn't tagged because it was already tagged earlier.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + taggingControllerLabelKey: "c812faa65d1d5e5aefa6b069b3da39df", // MD5 for key1=value1,key2=value2 + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0001", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Skip tagging node node0 since it was already tagged earlier."}, + }, + { + name: "fargate node joins the cluster.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fargatenode0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///us-west-2a/2ea696a557-9e55466d21eb4f83a99a9aa396bbd134/fargate-ip-10-0-55-27.us-west-2.compute.internal", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a Fargate node"}, + }, { name: "node0 leaves the cluster, failed to untag.", currNode: &v1.Node{ @@ -81,7 +129,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { }, }, toBeTagged: false, - expectedMessages: []string{"Error in untagging EC2 instance for node node0"}, + expectedMessages: []string{"Error in untagging EC2 instance i-error for node node0"}, }, { name: "node0 leaves the cluster.", @@ -124,7 +172,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { kubeClient: clientset, cloud: fakeAws, nodeMonitorPeriod: 1 * time.Second, - tags: map[string]string{"key": "value"}, + tags: map[string]string{"key2": "value2", "key1": "value1"}, resources: []string{"instance"}, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"), } diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index 20e3de41f3..83dc80176e 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -1836,7 +1836,7 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string return nil, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) if eni == nil || err != nil { return nil, err @@ -1879,7 +1879,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin return false, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) return eni != nil, err } @@ -1919,7 +1919,7 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str return false, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) return eni != nil, err } @@ -1980,7 +1980,7 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) return "", err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { return "", nil } @@ -2089,7 +2089,7 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo return cloudprovider.Zone{}, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) if eni == nil || err != nil { return cloudprovider.Zone{}, err @@ -5248,8 +5248,8 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins return awsInstance, instance, err } -// isFargateNode returns true if given node runs on Fargate compute -func isFargateNode(nodeName string) bool { +// IsFargateNode returns true if given node runs on Fargate compute +func IsFargateNode(nodeName string) bool { return strings.HasPrefix(nodeName, fargateNodeNamePrefix) } diff --git a/pkg/providers/v1/instances.go b/pkg/providers/v1/instances.go index abf0cccb87..20977c1fe5 100644 --- a/pkg/providers/v1/instances.go +++ b/pkg/providers/v1/instances.go @@ -78,7 +78,7 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) { // We sanity check the resulting volume; the two known formats are // i-12345678 and i-12345678abcdef01 - if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || isFargateNode(awsID)) { + if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || IsFargateNode(awsID)) { return "", fmt.Errorf("Invalid format for AWS instance (%s)", name) }