From 8d768ec0d8bcfa85ed114235a2047a3fc5240627 Mon Sep 17 00:00:00 2001 From: Sushmitha Ravikumar Date: Tue, 12 Apr 2022 17:14:48 -0700 Subject: [PATCH] add event recorder utils to raise aws-node pod events --- charts/aws-vpc-cni/templates/clusterrole.yaml | 4 + cmd/aws-k8s-agent/main.go | 3 + config/master/aws-k8s-cni-cn.yaml | 4 + config/master/aws-k8s-cni-us-gov-east-1.yaml | 4 + config/master/aws-k8s-cni-us-gov-west-1.yaml | 4 + config/master/aws-k8s-cni.yaml | 4 + pkg/awsutils/awsutils.go | 40 ++++++ pkg/ipamd/ipamd.go | 9 +- pkg/utils/eventrecorder/eventrecorder.go | 100 ++++++++++++++ pkg/utils/eventrecorder/eventrecorder_test.go | 105 +++++++++++++++ test/framework/framework.go | 7 +- test/framework/resources/k8s/manager.go | 7 + .../resources/k8s/resources/events.go | 39 ++++++ .../integration-new/ipamd/ipamd_event_test.go | 123 ++++++++++++++++++ 14 files changed, 448 insertions(+), 5 deletions(-) create mode 100644 pkg/utils/eventrecorder/eventrecorder.go create mode 100644 pkg/utils/eventrecorder/eventrecorder_test.go create mode 100644 test/framework/resources/k8s/resources/events.go create mode 100644 test/integration-new/ipamd/ipamd_event_test.go diff --git a/charts/aws-vpc-cni/templates/clusterrole.yaml b/charts/aws-vpc-cni/templates/clusterrole.yaml index d878203092..c7102ae444 100644 --- a/charts/aws-vpc-cni/templates/clusterrole.yaml +++ b/charts/aws-vpc-cni/templates/clusterrole.yaml @@ -33,3 +33,7 @@ rules: resources: - '*' verbs: ["list", "watch"] + - apiGroups: ["", "events.k8s.io"] + resources: + - events + verbs: ["create", "patch", "list", "get"] diff --git a/cmd/aws-k8s-agent/main.go b/cmd/aws-k8s-agent/main.go index 1d3b7dde59..1fde848113 100644 --- a/cmd/aws-k8s-agent/main.go +++ b/cmd/aws-k8s-agent/main.go @@ -19,6 +19,7 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd" "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" + "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/eventrecorder" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" "github.com/aws/amazon-vpc-cni-k8s/pkg/version" ) @@ -62,6 +63,8 @@ func _main() int { return 1 } + eventrecorder.InitEventRecorder(rawK8SClient) + ipamContext, err := ipamd.New(rawK8SClient, cacheK8SClient) if err != nil { log.Errorf("Initialization failure: %v", err) diff --git a/config/master/aws-k8s-cni-cn.yaml b/config/master/aws-k8s-cni-cn.yaml index 874c95a85a..0720941b4b 100644 --- a/config/master/aws-k8s-cni-cn.yaml +++ b/config/master/aws-k8s-cni-cn.yaml @@ -70,6 +70,10 @@ rules: resources: - '*' verbs: ["list", "watch"] + - apiGroups: ["", "events.k8s.io"] + resources: + - events + verbs: ["create", "patch", "list", "get"] --- # Source: aws-vpc-cni/templates/clusterrolebinding.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/config/master/aws-k8s-cni-us-gov-east-1.yaml b/config/master/aws-k8s-cni-us-gov-east-1.yaml index f4dc1199dd..8660e292bb 100644 --- a/config/master/aws-k8s-cni-us-gov-east-1.yaml +++ b/config/master/aws-k8s-cni-us-gov-east-1.yaml @@ -70,6 +70,10 @@ rules: resources: - '*' verbs: ["list", "watch"] + - apiGroups: ["", "events.k8s.io"] + resources: + - events + verbs: ["create", "patch", "list", "get"] --- # Source: aws-vpc-cni/templates/clusterrolebinding.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/config/master/aws-k8s-cni-us-gov-west-1.yaml b/config/master/aws-k8s-cni-us-gov-west-1.yaml index 53dee36d0d..0fd69f1120 100644 --- a/config/master/aws-k8s-cni-us-gov-west-1.yaml +++ b/config/master/aws-k8s-cni-us-gov-west-1.yaml @@ -70,6 +70,10 @@ rules: resources: - '*' verbs: ["list", "watch"] + - apiGroups: ["", "events.k8s.io"] + resources: + - events + verbs: ["create", "patch", "list", "get"] --- # Source: aws-vpc-cni/templates/clusterrolebinding.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/config/master/aws-k8s-cni.yaml b/config/master/aws-k8s-cni.yaml index 3e89a90d53..91632f453a 100644 --- a/config/master/aws-k8s-cni.yaml +++ b/config/master/aws-k8s-cni.yaml @@ -70,6 +70,10 @@ rules: resources: - '*' verbs: ["list", "watch"] + - apiGroups: ["", "events.k8s.io"] + resources: + - events + verbs: ["create", "patch", "list", "get"] --- # Source: aws-vpc-cni/templates/clusterrolebinding.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/awsutils/awsutils.go b/pkg/awsutils/awsutils.go index a8c00e9b9d..d8c297b579 100644 --- a/pkg/awsutils/awsutils.go +++ b/pkg/awsutils/awsutils.go @@ -28,6 +28,7 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils/awssession" "github.com/aws/amazon-vpc-cni-k8s/pkg/ec2wrapper" + "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/eventrecorder" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/retry" "github.com/aws/aws-sdk-go/aws" @@ -36,6 +37,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ) @@ -78,6 +80,9 @@ var ( var log = logger.Get() +// event recorder to raise aws-node pod events eg. if EC2 API calls fail due to UnauthorizedOperation error +var eventRecorder *eventrecorder.EventRecorder + var ( awsAPILatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -411,6 +416,8 @@ func New(useCustomNetworking, disableENIProvisioning, v4Enabled, v6Enabled bool) if err != nil { return nil, err } + // event recorder to raise events for failed EC2 API calls + eventRecorder = eventrecorder.Get() // Clean up leaked ENIs in the background if !disableENIProvisioning { @@ -555,6 +562,7 @@ func (cache *EC2InstanceMetadataCache) RefreshSGIDs(mac string) error { awsAPIErrInc("IMDSMetaDataOutOfSync", err) } } + CheckAPIErrorAndBroadcastEvent(err, "ec2:ModifyNetworkInterfaceAttribute") awsAPIErrInc("ModifyNetworkInterfaceAttribute", err) //No need to return error here since retry will happen in 30seconds and also //If update failed due to stale ENI then returning error will prevent updating SG @@ -695,6 +703,7 @@ func (cache *EC2InstanceMetadataCache) awsGetFreeDeviceNumber() (int, error) { result, err := cache.ec2SVC.DescribeInstancesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("DescribeInstances", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeInstances") awsAPIErrInc("DescribeInstances", err) log.Errorf("awsGetFreeDeviceNumber: Unable to retrieve instance data from EC2 control plane %v", err) return 0, errors.Wrap(err, @@ -758,6 +767,7 @@ func (cache *EC2InstanceMetadataCache) AllocENI(useCustomCfg bool, sg []*string, _, err = cache.ec2SVC.ModifyNetworkInterfaceAttributeWithContext(context.Background(), attributeInput) awsAPILatency.WithLabelValues("ModifyNetworkInterfaceAttribute", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:ModifyNetworkInterfaceAttribute") awsAPIErrInc("ModifyNetworkInterfaceAttribute", err) err := cache.FreeENI(eniID) if err != nil { @@ -787,6 +797,7 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) { attachOutput, err := cache.ec2SVC.AttachNetworkInterfaceWithContext(context.Background(), attachInput) awsAPILatency.WithLabelValues("AttachNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:AttachNetworkInterface") awsAPIErrInc("AttachNetworkInterface", err) log.Errorf("Failed to attach ENI %s: %v", eniID, err) return "", errors.Wrap(err, "attachENI: failed to attach ENI") @@ -835,6 +846,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string result, err := cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input) awsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:CreateNetworkInterface") awsAPIErrInc("CreateNetworkInterface", err) log.Errorf("Failed to CreateNetworkInterface %v", err) return "", errors.Wrap(err, "failed to create network interface") @@ -884,6 +896,7 @@ func (cache *EC2InstanceMetadataCache) TagENI(eniID string, currentTags map[stri _, err := cache.ec2SVC.CreateTagsWithContext(context.Background(), input) awsAPILatency.WithLabelValues("CreateTags", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:CreateTags") awsAPIErrInc("CreateTags", err) log.Warnf("Failed to tag the newly created ENI %s:", eniID) return err @@ -942,6 +955,7 @@ func (cache *EC2InstanceMetadataCache) freeENI(eniName string, sleepDelayAfterDe _, ec2Err := cache.ec2SVC.DetachNetworkInterfaceWithContext(context.Background(), detachInput) awsAPILatency.WithLabelValues("DetachNetworkInterface", fmt.Sprint(ec2Err != nil), awsReqStatus(ec2Err)).Observe(msSince(start)) if ec2Err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:DetachNetworkInterface") awsAPIErrInc("DetachNetworkInterface", ec2Err) log.Errorf("Failed to detach ENI %s %v", eniName, ec2Err) return errors.New("unable to detach ENI from EC2 instance, giving up") @@ -982,6 +996,7 @@ func (cache *EC2InstanceMetadataCache) getENIAttachmentID(eniID string) (*string return nil, ErrENINotFound } } + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") awsAPIErrInc("DescribeNetworkInterfaces", err) log.Errorf("Failed to get ENI %s information from EC2 control plane %v", eniID, err) return nil, errors.Wrap(err, "failed to describe network interface") @@ -1019,6 +1034,7 @@ func (cache *EC2InstanceMetadataCache) deleteENI(eniName string, maxBackoffDelay return nil } } + CheckAPIErrorAndBroadcastEvent(ec2Err, "ec2:DeleteNetworkInterface") awsAPIErrInc("DeleteNetworkInterface", ec2Err) log.Debugf("Not able to delete ENI: %v ", ec2Err) return errors.Wrapf(ec2Err, "unable to delete ENI") @@ -1044,6 +1060,7 @@ func (cache *EC2InstanceMetadataCache) GetIPv4sFromEC2(eniID string) (addrList [ return nil, ErrENINotFound } } + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") awsAPIErrInc("DescribeNetworkInterfaces", err) log.Errorf("Failed to get ENI %s information from EC2 control plane %v", eniID, err) return nil, errors.Wrap(err, "failed to describe network interface") @@ -1071,7 +1088,9 @@ func (cache *EC2InstanceMetadataCache) GetIPv4PrefixesFromEC2(eniID string) (add if aerr.Code() == "InvalidNetworkInterfaceID.NotFound" { return nil, ErrENINotFound } + } + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") awsAPIErrInc("DescribeNetworkInterfaces", err) log.Errorf("Failed to get ENI %s information from EC2 control plane %v", eniID, err) return nil, errors.Wrap(err, "failed to describe network interface") @@ -1099,7 +1118,9 @@ func (cache *EC2InstanceMetadataCache) GetIPv6PrefixesFromEC2(eniID string) (add if aerr.Code() == "InvalidNetworkInterfaceID.NotFound" { return nil, ErrENINotFound } + } + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") awsAPIErrInc("DescribeNetworkInterfaces", err) log.Errorf("Failed to get ENI %s information from EC2 control plane %v", eniID, err) return nil, errors.Wrap(err, "failed to describe network interface") @@ -1140,6 +1161,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (DescribeAllENIsResult, break } awsAPIErrInc("DescribeNetworkInterfaces", err) + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") log.Errorf("Failed to call ec2:DescribeNetworkInterfaces for %v: %v", aws.StringValueSlice(input.NetworkInterfaceIds), err) if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == "InvalidNetworkInterfaceID.NotFound" { @@ -1327,6 +1349,7 @@ func (cache *EC2InstanceMetadataCache) AllocIPAddress(eniID string) error { output, err := cache.ec2SVC.AssignPrivateIpAddressesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("AssignPrivateIpAddresses", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:AssignPrivateIpAddresses") awsAPIErrInc("AssignPrivateIpAddresses", err) log.Errorf("Failed to allocate a private IP address %v", err) return errors.Wrap(err, "failed to assign private IP addresses") @@ -1347,6 +1370,7 @@ func (cache *EC2InstanceMetadataCache) FetchInstanceTypeLimits() error { describeInstanceTypesInput := &ec2.DescribeInstanceTypesInput{InstanceTypes: []*string{aws.String(cache.instanceType)}} output, err := cache.ec2SVC.DescribeInstanceTypesWithContext(context.Background(), describeInstanceTypesInput) if err != nil || len(output.InstanceTypes) != 1 { + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeInstanceTypes") return errors.New(fmt.Sprintf("Failed calling DescribeInstanceTypes for `%s`: %v", cache.instanceType, err)) } info := output.InstanceTypes[0] @@ -1451,6 +1475,7 @@ func (cache *EC2InstanceMetadataCache) AllocIPAddresses(eniID string, numIPs int output, err := cache.ec2SVC.AssignPrivateIpAddressesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("AssignPrivateIpAddresses", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:AssignPrivateIpAddresses") if containsPrivateIPAddressLimitExceededError(err) { log.Debug("AssignPrivateIpAddresses returned PrivateIpAddressLimitExceeded. This can happen if the data store is out of sync." + "Returning without an error here since we will verify the actual state by calling EC2 to see what addresses have already assigned to this ENI.") @@ -1480,6 +1505,7 @@ func (cache *EC2InstanceMetadataCache) AllocIPv6Prefixes(eniID string) ([]*strin output, err := cache.ec2SVC.AssignIpv6AddressesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("AssignIpv6AddressesWithContext", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:AssignPrivateIpv6Addresses") log.Errorf("Failed to allocate IPv6 Prefixes on ENI %v: %v", eniID, err) awsAPIErrInc("AssignPrivateIpv6Addresses", err) return nil, errors.Wrap(err, "allocate IPv6 prefix: failed to allocate an IPv6 prefix address") @@ -1578,6 +1604,7 @@ func (cache *EC2InstanceMetadataCache) DeallocIPAddresses(eniID string, ips []st _, err := cache.ec2SVC.UnassignPrivateIpAddressesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("UnassignPrivateIpAddresses", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:UnassignPrivateIpAddresses") awsAPIErrInc("UnassignPrivateIpAddresses", err) log.Errorf("Failed to deallocate a private IP address %v", err) return errors.Wrap(err, fmt.Sprintf("deallocate IP addresses: failed to deallocate private IP addresses: %s", ips)) @@ -1603,6 +1630,7 @@ func (cache *EC2InstanceMetadataCache) DeallocPrefixAddresses(eniID string, pref _, err := cache.ec2SVC.UnassignPrivateIpAddressesWithContext(context.Background(), input) awsAPILatency.WithLabelValues("UnassignPrivateIpAddresses", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:UnassignPrivateIpAddresses") awsAPIErrInc("UnassignPrivateIpAddresses", err) log.Errorf("Failed to deallocate a Prefixes address %v", err) return errors.Wrap(err, fmt.Sprintf("deallocate prefix: failed to deallocate Prefix addresses: %v", prefixes)) @@ -1662,6 +1690,7 @@ func (cache *EC2InstanceMetadataCache) tagENIcreateTS(eniID string, maxBackoffDe _, err := cache.ec2SVC.CreateTagsWithContext(context.Background(), input) awsAPILatency.WithLabelValues("CreateTags", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start)) if err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:CreateTags") awsAPIErrInc("CreateTags", err) log.Warnf("Failed to add tag to ENI %s: %v", eniID, err) return err @@ -1836,6 +1865,8 @@ func (cache *EC2InstanceMetadataCache) getENIsFromPaginatedDescribeNetworkInterf } if err := cache.ec2SVC.DescribeNetworkInterfacesPagesWithContext(context.TODO(), input, pageFn); err != nil { + CheckAPIErrorAndBroadcastEvent(err, "ec2:DescribeNetworkInterfaces") + awsAPIErrInc("DescribeNetworkInterfaces", err) return err } return innerErr @@ -1864,3 +1895,12 @@ func (cache *EC2InstanceMetadataCache) IsPrimaryENI(eniID string) bool { } return false } + +func CheckAPIErrorAndBroadcastEvent(err error, api string) { + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "UnauthorizedOperation" { + eventRecorder.BroadcastEvent(v1.EventTypeWarning, "MissingIAMPermissions", + fmt.Sprintf("Unauthorized operation: failed to call %v due to missing permissions. Please refer https://github.com/aws/amazon-vpc-cni-k8s/blob/master/docs/iam-policy.md to attach relevant policy to IAM role", api)) + } + } +} diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 9eb74d6784..2717cb93aa 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -462,8 +462,9 @@ func (c *IPAMContext) nodeInit() error { metadataResult, err := c.awsClient.DescribeAllENIs() if err != nil { - return errors.New("ipamd init: failed to retrieve attached ENIs info") + return errors.Wrap(err, "ipamd init: failed to retrieve attached ENIs info") } + log.Debugf("DescribeAllENIs success: ENIs: %d, tagged: %d", len(metadataResult.ENIMetadata), len(metadataResult.TagMap)) c.awsClient.SetCNIUnmanagedENIs(metadataResult.MultiCardENIIDs) c.setUnmanagedENIs(metadataResult.TagMap) @@ -938,7 +939,7 @@ func (c *IPAMContext) tryAssignIPs() (increasedPool bool, err error) { err = c.awsClient.AllocIPAddresses(eni.ID, 1) if err != nil { ipamdErrInc("increaseIPPoolAllocIPAddressesFailed") - return false, errors.Wrap(err, fmt.Sprintf("failed to allocate one IP addresses on ENI %s, err: %v", eni.ID, err)) + return false, errors.Wrap(err, fmt.Sprintf("failed to allocate one IP addresses on ENI %s, err ", eni.ID)) } } // This call to EC2 is needed to verify which IPs got attached to this ENI. @@ -1908,13 +1909,13 @@ func (c *IPAMContext) GetPod(podName, namespace string) (*corev1.Pod, error) { } // AnnotatePod annotates the pod with the provided key and value -func (c *IPAMContext) AnnotatePod(podNamespace, podName, key, val string) error { +func (c *IPAMContext) AnnotatePod(podName, podNamespace, key, val string) error { ctx := context.TODO() var err error err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { pod := &corev1.Pod{} - if pod, err = c.GetPod(podNamespace, podName); err != nil { + if pod, err = c.GetPod(podName, podNamespace); err != nil { return err } diff --git a/pkg/utils/eventrecorder/eventrecorder.go b/pkg/utils/eventrecorder/eventrecorder.go new file mode 100644 index 0000000000..c0668d16c1 --- /dev/null +++ b/pkg/utils/eventrecorder/eventrecorder.go @@ -0,0 +1,100 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package event recorder is used to raise events on aws-node pods +package eventrecorder + +import ( + "context" + "errors" + "os" + + "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" + "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var log = logger.Get() +var myNodeName = os.Getenv("MY_NODE_NAME") +var eventRecorder *EventRecorder + +const ( + awsNode = "aws-node" + specNodeName = "spec.nodeName" + labelK8sapp = "k8s-app" +) + +type EventRecorder struct { + recorder record.EventRecorder + k8sClient client.Client +} + +func InitEventRecorder(k8sClient client.Client) error { + + clientSet, err := k8sapi.GetKubeClientSet() + if err != nil { + log.Fatalf("Error Fetching Kubernetes Client: %s", err) + return err + } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{ + Interface: clientSet.CoreV1().Events(""), + }) + + recorder := &EventRecorder{} + recorder.recorder = eventBroadcaster.NewRecorder(clientgoscheme.Scheme, corev1.EventSource{ + Component: awsNode, + Host: myNodeName, + }) + recorder.k8sClient = k8sClient + eventRecorder = recorder + log.Infof("eventrecoder set:", eventRecorder.recorder) + return nil +} + +func Get() *EventRecorder { + if eventRecorder == nil { + err := errors.New("error fetching event recoder, not initialized") + panic(err.Error()) + } + return eventRecorder +} + +// BroadcastEvent will raise event on aws-node with given type, reason, & message +func (e *EventRecorder) BroadcastEvent(eventType, reason, message string) { + + // Get aws-node pod objects with label & field selectors + labelSelector := labels.SelectorFromSet(labels.Set{labelK8sapp: awsNode}) + fieldSelector := fields.SelectorFromSet(fields.Set{specNodeName: myNodeName}) + listOptions := client.ListOptions{ + LabelSelector: labelSelector, + FieldSelector: fieldSelector, + } + + var podList corev1.PodList + err := e.k8sClient.List(context.TODO(), &podList, &listOptions) + if err != nil { + log.Errorf("Failed to get pods, cannot broadcast events: %v", err) + return + } + for _, pod := range podList.Items { + log.Debugf("Broadcasting event on pod %s", pod.Name) + e.recorder.Event(&pod, eventType, reason, message) + } +} diff --git a/pkg/utils/eventrecorder/eventrecorder_test.go b/pkg/utils/eventrecorder/eventrecorder_test.go new file mode 100644 index 0000000000..314ae9e103 --- /dev/null +++ b/pkg/utils/eventrecorder/eventrecorder_test.go @@ -0,0 +1,105 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package eventrecorder + +import ( + "context" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var ctrl *gomock.Controller +var fakeRecorder *record.FakeRecorder + +type testMocks struct { + ctrl *gomock.Controller + mockK8sClient client.Client +} + +func setup(t *testing.T) *testMocks { + ctrl = gomock.NewController(t) + k8sSchema := runtime.NewScheme() + k8sClient := testclient.NewFakeClientWithScheme(k8sSchema) + clientgoscheme.AddToScheme(k8sSchema) + + return &testMocks{ + ctrl: ctrl, + mockK8sClient: k8sClient, + } +} + +func TestBroadcastEvents(t *testing.T) { + m := setup(t) + defer m.ctrl.Finish() + ctx := context.Background() + + fakeRecorder = record.NewFakeRecorder(3) + mockEventRecorder := &EventRecorder{ + recorder: fakeRecorder, + k8sClient: m.mockK8sClient, + } + + labels := map[string]string{"k8s-app": "aws-node"} + + pods := []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "mockPodWithLabelAndSpec", + Labels: labels, + }, + Spec: v1.PodSpec{ + NodeName: myNodeName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "mockPodWithSpec", + }, + Spec: v1.PodSpec{ + NodeName: myNodeName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "mockPod", + }, + }, + // No pod with only label selector in test- this will raise event as fake client does not filter on fields + } + + //Create above fake pods + for _, mockPod := range pods { + _ = mockEventRecorder.k8sClient.Create(ctx, &mockPod) + } + + // Testing missing permissions event case: failed to call + reason := "MissingIAMPermission" + msg := "Failed to call ec2:DescribeNetworkInterfaces due to missing permissions. Please refer to https://github.com/aws/amazon-vpc-cni-k8s/blob/master/docs/iam-policy.md" + mockEventRecorder.BroadcastEvent(v1.EventTypeWarning, reason, msg) + assert.Len(t, fakeRecorder.Events, 1) // event should be recorded only on pod with req label selector & pod spec + + expected := fmt.Sprintf("%s %s %s", v1.EventTypeWarning, reason, msg) + got := <-fakeRecorder.Events + assert.Equal(t, expected, got) +} diff --git a/test/framework/framework.go b/test/framework/framework.go index d2cdf258a3..f179859abd 100644 --- a/test/framework/framework.go +++ b/test/framework/framework.go @@ -14,6 +14,7 @@ package framework import ( + "context" "log" eniConfig "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1" @@ -23,6 +24,7 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s" sgp "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1beta1" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -57,7 +59,10 @@ func New(options Options) *Framework { stopChan := ctrl.SetupSignalHandler() cache, err := cache.New(config, cache.Options{Scheme: k8sSchema}) Expect(err).NotTo(HaveOccurred()) - + err = cache.IndexField(context.TODO(), &v1.Event{}, "reason", func(o client.Object) []string { + return []string{o.(*v1.Event).Reason} + }) // default indexing only on ns, need this for ipamd_event_test + Expect(err).NotTo(HaveOccurred()) go func() { cache.Start(stopChan) }() diff --git a/test/framework/resources/k8s/manager.go b/test/framework/resources/k8s/manager.go index 097e41759b..aa8b28a595 100644 --- a/test/framework/resources/k8s/manager.go +++ b/test/framework/resources/k8s/manager.go @@ -32,6 +32,7 @@ type ResourceManagers interface { DaemonSetManager() resources.DaemonSetManager ConfigMapManager() resources.ConfigMapManager NetworkPolicyManager() resources.NetworkPolicyManager + EventManager() resources.EventManager } type defaultManager struct { @@ -45,6 +46,7 @@ type defaultManager struct { daemonSetManager resources.DaemonSetManager configMapManager resources.ConfigMapManager networkPolicyManager resources.NetworkPolicyManager + eventManager resources.EventManager } func NewResourceManager(k8sClient client.Client, @@ -60,6 +62,7 @@ func NewResourceManager(k8sClient client.Client, daemonSetManager: resources.NewDefaultDaemonSetManager(k8sClient), configMapManager: resources.NewConfigMapManager(k8sClient), networkPolicyManager: resources.NewNetworkPolicyManager(k8sClient), + eventManager: resources.NewEventManager(k8sClient), } } @@ -102,3 +105,7 @@ func (m *defaultManager) ConfigMapManager() resources.ConfigMapManager { func (m *defaultManager) NetworkPolicyManager() resources.NetworkPolicyManager { return m.networkPolicyManager } + +func (m defaultManager) EventManager() resources.EventManager { + return m.eventManager +} diff --git a/test/framework/resources/k8s/resources/events.go b/test/framework/resources/k8s/resources/events.go new file mode 100644 index 0000000000..9592c3b1b5 --- /dev/null +++ b/test/framework/resources/k8s/resources/events.go @@ -0,0 +1,39 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package resources + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type EventManager interface { + GetEventsWithOptions(opts *client.ListOptions) (v1.EventList, error) +} + +type defaultEventManager struct { + k8sClient client.Client +} + +func NewEventManager(k8sClient client.Client) EventManager { + return &defaultEventManager{k8sClient: k8sClient} +} + +func (d defaultEventManager) GetEventsWithOptions(opts *client.ListOptions) (v1.EventList, error) { + eventList := v1.EventList{} + err := d.k8sClient.List(context.Background(), &eventList, opts) + return eventList, err +} diff --git a/test/integration-new/ipamd/ipamd_event_test.go b/test/integration-new/ipamd/ipamd_event_test.go new file mode 100644 index 0000000000..a907330e8c --- /dev/null +++ b/test/integration-new/ipamd/ipamd_event_test.go @@ -0,0 +1,123 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ipamd + +import ( + "strings" + "time" + + k8sUtil "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/utils" + "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const EKSCNIPolicyARN = "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy" +const AwsNodeLabelKey = "k8s-app" + +var _ = Describe("test aws-node pod event", func() { + + // Verifies aws-node pod events works as expected + Context("when iam role is missing VPC_CNI policy", func() { + var role string + + BeforeEach(func() { + // To get the role assumed by CNI, first check the ENV "AWS_ROLE_ARN" on aws-node to get the service account role + // If not found, get the node instance role + By("getting the iam role") + podList, err := f.K8sResourceManagers.PodManager().GetPodsWithLabelSelector(AwsNodeLabelKey, utils.AwsNodeName) + Expect(err).ToNot(HaveOccurred()) + for _, env := range podList.Items[0].Spec.Containers[0].Env { + if env.Name == "AWS_ROLE_ARN" { + role = strings.Split(env.Value, "/")[1] + } + } + + if role == "" { // get the node instance role + nodeList, err := f.K8sResourceManagers.NodeManager(). + GetNodes(f.Options.NgNameLabelKey, f.Options.NgNameLabelVal) + Expect(err).ToNot(HaveOccurred()) + + instanceID := k8sUtil.GetInstanceIDFromNode(nodeList.Items[0]) + instance, err := f.CloudServices.EC2().DescribeInstance(instanceID) + Expect(err).ToNot(HaveOccurred()) + + By("getting the node instance role") + instanceProfileRoleName := strings.Split(*instance.IamInstanceProfile.Arn, "instance-profile/")[1] + instanceProfileOutput, err := f.CloudServices.IAM().GetInstanceProfile(instanceProfileRoleName) + Expect(err).ToNot(HaveOccurred()) + role = *instanceProfileOutput.InstanceProfile.Roles[0].RoleName + } + + By("detaching VPC_CNI policy and restart aws-node pods") + err = f.CloudServices.IAM().DetachRolePolicy(EKSCNIPolicyARN, role) + Expect(err).ToNot(HaveOccurred()) + RestartAwsNodePods() + + By("checking aws-node pods not running") + time.Sleep(utils.PollIntervalMedium) // allow time for aws-node to restart + podList, err = f.K8sResourceManagers.PodManager().GetPodsWithLabelSelector(AwsNodeLabelKey, utils.AwsNodeName) + Expect(err).ToNot(HaveOccurred()) + + for _, cond := range podList.Items[0].Status.Conditions { + if cond.Type == v1.PodReady { + Expect(cond.Status).To(BeEquivalentTo(v1.ConditionFalse)) + break + } + } + + }) + + AfterEach(func() { + By("attaching VPC_CNI policy and restart aws-node pods") + err = f.CloudServices.IAM().AttachRolePolicy(EKSCNIPolicyARN, role) + Expect(err).ToNot(HaveOccurred()) + RestartAwsNodePods() + + By("checking aws-node pods are running") + time.Sleep(utils.PollIntervalMedium * 2) // sleep to allow aws-node to restart + podList, err := f.K8sResourceManagers.PodManager().GetPodsWithLabelSelector(AwsNodeLabelKey, utils.AwsNodeName) + Expect(err).ToNot(HaveOccurred()) + for _, cond := range podList.Items[0].Status.Conditions { + if cond.Type != v1.PodReady { + continue + } + Expect(cond.Status).To(BeEquivalentTo(v1.ConditionTrue)) + break + } + }) + + It("unauthorized event must be raised on aws-node pod", func() { + listOpts := client.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"reason": "MissingIAMPermissions"}), + Namespace: utils.AwsNodeNamespace, + } + eventList, err := f.K8sResourceManagers.EventManager().GetEventsWithOptions(&listOpts) + Expect(err).ToNot(HaveOccurred()) + Expect(eventList.Items).NotTo(BeEmpty()) + + }) + }) +}) + +func RestartAwsNodePods() { + podList, err := f.K8sResourceManagers.PodManager().GetPodsWithLabelSelector("k8s-app", utils.AwsNodeName) + Expect(err).ToNot(HaveOccurred()) + for _, pod := range podList.Items { + f.K8sResourceManagers.PodManager().DeleteAndWaitTillPodDeleted(&pod) + } +}