From 3d1a4e25a7cf34ee6b3016fd8f5678014b2c82e6 Mon Sep 17 00:00:00 2001 From: anirudhAgniRedhat Date: Fri, 8 Nov 2024 14:35:09 +0530 Subject: [PATCH] Added Review nits --- go.mod | 2 +- pkg/driver/aws-ebs/aws_ebs.go | 2 +- pkg/driver/aws-ebs/aws_ebs_tags_controller.go | 223 ++++++++---------- .../aws-ebs/aws_ebs_tags_controller_test.go | 75 ------ .../aws-ebs/aws_ebs_tags_retry_worker.go | 160 +++++++------ 5 files changed, 190 insertions(+), 272 deletions(-) diff --git a/go.mod b/go.mod index ca21a900f..25c1ffd2d 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/openshift/library-go v0.0.0-20241120135057-fc703a7407c9 github.com/prometheus/client_golang v1.19.1 github.com/spf13/cobra v1.8.1 + golang.org/x/time v0.5.0 gopkg.in/ini.v1 v1.67.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.31.1 @@ -108,7 +109,6 @@ require ( golang.org/x/sys v0.25.0 // indirect golang.org/x/term v0.24.0 // indirect golang.org/x/text v0.18.0 // indirect - golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240709173604-40e1e62336c5 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect diff --git a/pkg/driver/aws-ebs/aws_ebs.go b/pkg/driver/aws-ebs/aws_ebs.go index 3b831683a..0e13d1833 100644 --- a/pkg/driver/aws-ebs/aws_ebs.go +++ b/pkg/driver/aws-ebs/aws_ebs.go @@ -163,7 +163,7 @@ func GetAWSEBSOperatorControllerConfig(ctx context.Context, flavour generator.Cl cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, ctrl) } - volumeTagController := NewEBSVolumeTagsController(ctx, cfg.GetControllerName("EBSVolumeTagsController"), c, c.EventRecorder) + volumeTagController := NewEBSVolumeTagsController(cfg.GetControllerName("EBSVolumeTagsController"), c, c.EventRecorder) cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, volumeTagController) cfg.DeploymentInformers = append(cfg.DeploymentInformers, c.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Informer()) diff --git a/pkg/driver/aws-ebs/aws_ebs_tags_controller.go b/pkg/driver/aws-ebs/aws_ebs_tags_controller.go index e5332adb9..c907e6bcd 100644 --- a/pkg/driver/aws-ebs/aws_ebs_tags_controller.go +++ b/pkg/driver/aws-ebs/aws_ebs_tags_controller.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "golang.org/x/time/rate" "os" "sort" "time" @@ -12,7 +13,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -34,30 +34,30 @@ const ( driverName = "ebs.csi.aws.com" tagHashAnnotationKey = "ebs.openshift.io/volume-tags-hash" batchSize = 50 - - operationDelay = 2 * time.Second - operationBackoffFactor = 1.2 - operationRetryCount = 5 ) type EBSVolumeTagsController struct { name string commonClient *clients.Clients eventRecorder events.Recorder - failedQueue workqueue.RateLimitingInterface + failedQueue workqueue.TypedRateLimitingInterface[string] + rateLimiter *rate.Limiter } func NewEBSVolumeTagsController( - ctx context.Context, name string, commonClient *clients.Clients, eventRecorder events.Recorder) factory.Controller { + // 10 qps, 100 bucket size. + rateLimiter := rate.NewLimiter(rate.Limit(10), 100) + c := &EBSVolumeTagsController{ name: name, commonClient: commonClient, eventRecorder: eventRecorder, - failedQueue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Hour)), + failedQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.NewTypedItemExponentialFailureRateLimiter[string](10*time.Second, 100*time.Hour)), + rateLimiter: rateLimiter, } return factory.New().WithSync( c.Sync, @@ -83,19 +83,14 @@ func (c *EBSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync return nil } - infra, err := c.getInfrastructure(ctx) + infra, err := c.getInfrastructure() if err != nil { return err } - var infraRegion = "" - if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil { - infraRegion = infra.Status.PlatformStatus.AWS.Region - } - ec2Client, err := c.getEC2Client(ctx, infraRegion) - if err != nil { - return err + if infra == nil { + return nil } - err = c.processInfrastructure(ctx, infra, ec2Client) + err = c.processInfrastructure(ctx, infra) if err != nil { return err } @@ -189,57 +184,30 @@ func writeCredentialsToTempFile(data []byte) (string, error) { } // getInfrastructure retrieves the Infrastructure resource in OpenShift -func (c *EBSVolumeTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, - } - infra := &configv1.Infrastructure{} - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - infra, apiError = c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName) - if apiError != nil { - klog.Errorf("error listing infrastructures objects: %v", apiError) - return false, nil - } - if infra != nil { - return true, nil - } - return false, nil - }) +func (c *EBSVolumeTagsController) getInfrastructure() (*configv1.Infrastructure, error) { + infra, err := c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName) + if err != nil { + klog.Errorf("error listing infrastructures objects: %v", err) + return nil, err + } return infra, err } func (c *EBSVolumeTagsController) getEBSCloudCredSecret(ctx context.Context) (*v1.Secret, error) { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, - } - var awsCreds *v1.Secret - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - awsCreds, apiError = c.commonClient.KubeClient.CoreV1().Secrets(awsEBSSecretNamespace).Get(ctx, awsEBSSecretName, metav1.GetOptions{}) - if apiError != nil { - klog.Errorf("error getting secret object: %v", apiError) - return false, nil - } - if awsCreds != nil { - return true, nil - } - return false, nil - }) - return awsCreds, err + awsCreds, err := c.commonClient.KubeInformers.InformersFor(awsEBSSecretNamespace).Core().V1().Secrets().Lister().Secrets(awsEBSSecretNamespace).Get(awsEBSSecretName) + if err != nil { + klog.Errorf("error getting secret object: %v", err) + return nil, err + } + return awsCreds, nil } // processInfrastructure processes the Infrastructure resource and updates EBS tags -func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure, ec2Client *ec2.EC2) error { +func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure) error { if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil && infra.Status.PlatformStatus.AWS.ResourceTags != nil { - awsInfra := infra.Status.PlatformStatus.AWS - err := c.fetchPVsAndUpdateTags(ctx, awsInfra.ResourceTags, ec2Client) + err := c.fetchPVsAndUpdateTags(ctx, infra) if err != nil { klog.Errorf("Error processing PVs for infrastructure update: %v", err) return err @@ -249,14 +217,14 @@ func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, inf } // fetchPVsAndUpdateTags retrieves all PVs and updates the AWS EBS tags in batches of 100 -func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, resourceTags []configv1.AWSResourceTag, ec2Client *ec2.EC2) error { - pvs, err := c.listPersistentVolumesWithRetry(ctx) +func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, infra *configv1.Infrastructure) error { + pvs, err := c.listPersistentVolumes() if err != nil { return fmt.Errorf("error fetching PVs: %v", err) } // Compute the hash for the new set of tags - newTagsHash := computeTagsHash(resourceTags) - pvsToBeUpdated := filterUpdatableVolumes(pvs, newTagsHash) + newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags) + pvsToBeUpdated := c.filterUpdatableVolumes(pvs, newTagsHash) // If there are no volumes to update, return early if len(pvsToBeUpdated) == 0 { @@ -264,6 +232,15 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res return nil } + var infraRegion = "" + if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil { + infraRegion = infra.Status.PlatformStatus.AWS.Region + } + ec2Client, err := c.getEC2Client(ctx, infraRegion) + if err != nil { + return err + } + // Process the volumes in batches for i := 0; i < len(pvsToBeUpdated); i += batchSize { end := i + batchSize @@ -273,7 +250,7 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res batch := pvsToBeUpdated[i:end] // Update tags on AWS EBS volumes - err = c.updateEBSTags(batch, ec2Client, resourceTags) + err = c.updateEBSTags(ctx, batch, ec2Client, infra.Status.PlatformStatus.AWS.ResourceTags) if err != nil { c.handleTagUpdateFailure(batch, err) continue @@ -282,10 +259,10 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res // Update PV annotations after successfully updating the tags in AWS for _, volume := range batch { // Set the new tag hash annotation in the PV object - setPVTagHash(volume, newTagsHash) + updatedVolume := setPVTagHash(volume, newTagsHash) // Update the PV with the new annotations - err = c.updateVolumeWithRetry(ctx, volume) + err = c.updateVolume(ctx, updatedVolume) if err != nil { klog.Errorf("Error updating PV annotations for volume %s: %v", volume.Name, err) c.failedQueue.AddRateLimited(volume.Name) // Retry updating annotation if update fails @@ -297,71 +274,57 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res return nil } -// updateEBSTags updates the tags of an AWS EBS volume -func (c *EBSVolumeTagsController) updateEBSTags(pvBatch []*v1.PersistentVolume, ec2Client *ec2.EC2, resourceTags []configv1.AWSResourceTag) error { - - // Merge the existing tags with new resource tags - mergedTags := newAndUpdatedTags(resourceTags) +// updateEBSTags updates the tags of an AWS EBS volume with rate limiting +func (c *EBSVolumeTagsController) updateEBSTags(ctx context.Context, pvBatch []*v1.PersistentVolume, ec2Client *ec2.EC2, + resourceTags []configv1.AWSResourceTag) error { + err := c.rateLimiter.Wait(ctx) + // If context is cancelled or rate limit cannot be acquired, return error + if err != nil { + klog.Errorf("Rate limiter error: %v", err) + return err + } + // Prepare tags + tags := newAndUpdatedTags(resourceTags) // Create or update the tags - _, err := ec2Client.CreateTags(&ec2.CreateTagsInput{ + _, err = ec2Client.CreateTags(&ec2.CreateTagsInput{ Resources: pvsToResourceIDs(pvBatch), - Tags: mergedTags, + Tags: tags, }) - if err != nil { - return fmt.Errorf("error creating tags for volume %v: %v", pvBatch, err) + return err } return nil } -func (c *EBSVolumeTagsController) listPersistentVolumesWithRetry(ctx context.Context) ([]*v1.PersistentVolume, error) { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, - } - var pvList []*v1.PersistentVolume - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - pvList, apiError = c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything()) - if apiError != nil { - klog.Errorf("error listing volumes objects: %v", apiError) - return false, nil - } - return true, nil - }) - return pvList, err +func (c *EBSVolumeTagsController) listPersistentVolumes() ([]*v1.PersistentVolume, error) { + pvList, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything()) + if err != nil { + klog.Errorf("error listing volumes objects: %v", err) + return nil, err + } + return pvList, nil } -func (c *EBSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error { - backoff := wait.Backoff{ - Duration: operationDelay, - Factor: operationBackoffFactor, - Steps: operationRetryCount, - } - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - var apiError error - _, apiError = c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{}) - if apiError != nil { - klog.Errorf("error updating volume object %s: %v", pv.Name, apiError) - return false, nil - } - return true, nil - }) - return err +func (c *EBSVolumeTagsController) updateVolume(ctx context.Context, pv *v1.PersistentVolume) error { + _, err := c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("error updating volume object %s: %v", pv.Name, err) + return err + } + return nil } -func (c *EBSVolumeTagsController) handleTagUpdateFailure(batch []*v1.PersistentVolume, error error) { - errorMessage := fmt.Sprintf("Error updating tags for volume %v: %v", batch, error) +func (c *EBSVolumeTagsController) handleTagUpdateFailure(batch []*v1.PersistentVolume, updateErr error) { for _, pv := range batch { - klog.Errorf("Error updating volume %v tags: %v", pv.Name, errorMessage) + klog.Errorf("Error updating volume %v tags: %v", pv.Name, updateErr) c.failedQueue.AddRateLimited(pv.Name) } var pvNames []string for _, pv := range batch { pvNames = append(pvNames, pv.Name) } + errorMessage := fmt.Sprintf("Error updating tags for volume %v: %v", pvNames, updateErr) // Emit a warning event for the failure c.eventRecorder.Warning("EBSVolumeTagsUpdateFailed", fmt.Sprintf("Failed to update tags for batch %v: %v", pvNames, errorMessage)) } @@ -379,17 +342,28 @@ func newAndUpdatedTags(resourceTags []configv1.AWSResourceTag) []*ec2.Tag { return tags } -func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume { - var pvsToBeUpdated = make([]*v1.PersistentVolume, 0) +func (c *EBSVolumeTagsController) filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume { + var updatablePVs []*v1.PersistentVolume + for _, volume := range volumes { - if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == driverName { - existingHash := getPVTagHash(volume) - if existingHash == "" || existingHash != newTagsHash { - pvsToBeUpdated = append(pvsToBeUpdated, volume) - } + // Check if the volume is a CSI volume with the correct driver + if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == driverName && + // Ensure the volume is not already in the failed queue + !c.isVolumeInFailedQueue(volume.Name) && + // Include volumes whose tag hash is missing or different from the new hash + getPVTagHash(volume) != newTagsHash { + + // Add the volume to the list of updatable volumes + updatablePVs = append(updatablePVs, volume) } } - return pvsToBeUpdated + return updatablePVs +} + +// isVolumeInFailedQueue checks if a volume name is currently in the failed queue +func (c *EBSVolumeTagsController) isVolumeInFailedQueue(volumeName string) bool { + // Check if the volume name is in the failed queue + return c.failedQueue.NumRequeues(volumeName) > 0 } func pvsToResourceIDs(volumes []*v1.PersistentVolume) []*string { @@ -401,14 +375,19 @@ func pvsToResourceIDs(volumes []*v1.PersistentVolume) []*string { } // setPVTagHash stores the hash in the PV annotations. -func setPVTagHash(pv *v1.PersistentVolume, hash string) { +func setPVTagHash(pv *v1.PersistentVolume, hash string) *v1.PersistentVolume { + // Create a deep copy of the PersistentVolume to avoid modifying the cached object + pvCopy := pv.DeepCopy() + // Ensure the PV has an annotations map - if pv.Annotations == nil { - pv.Annotations = make(map[string]string) + if pvCopy.Annotations == nil { + pvCopy.Annotations = make(map[string]string) } // Set or update the tag hash annotation - pv.Annotations[tagHashAnnotationKey] = hash + pvCopy.Annotations[tagHashAnnotationKey] = hash + + return pvCopy } // getPVTagHash gets the hash stored in the PV annotations. diff --git a/pkg/driver/aws-ebs/aws_ebs_tags_controller_test.go b/pkg/driver/aws-ebs/aws_ebs_tags_controller_test.go index 4dd4d0a02..7c4b8723d 100644 --- a/pkg/driver/aws-ebs/aws_ebs_tags_controller_test.go +++ b/pkg/driver/aws-ebs/aws_ebs_tags_controller_test.go @@ -1,89 +1,14 @@ package aws_ebs import ( - "context" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" configv1 "github.com/openshift/api/config/v1" - opv1 "github.com/openshift/api/operator/v1" - fakeconfig "github.com/openshift/client-go/config/clientset/versioned/fake" - "github.com/openshift/client-go/config/informers/externalversions" - "github.com/openshift/csi-operator/pkg/clients" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" "testing" ) -type FakeOperator struct { - metav1.ObjectMeta - Spec opv1.OperatorSpec - Status opv1.OperatorStatus -} - -func TestEBSVolumeTagsController_Sync(t *testing.T) { - ctx := context.TODO() - - fakeConfigClient := fakeconfig.NewSimpleClientset() - informerFactory := externalversions.NewSharedInformerFactory(fakeConfigClient, 0) - informerFactory.Config().V1().Infrastructures().Informer() - - // Test getEC2Client with valid and invalid AWS credentials - t.Run("TestGetEC2Client", func(t *testing.T) { - fakeCoreClient := fake.NewSimpleClientset() - - // Case 1: Valid credentials - validSecret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: awsEBSSecretName, - Namespace: awsEBSSecretNamespace, - }, - Data: map[string][]byte{ - "aws_access_key_id": []byte("test-access-key"), - "aws_secret_access_key": []byte("test-secret-key"), - }, - } - _, err := fakeCoreClient.CoreV1().Secrets(awsEBSSecretNamespace).Create(ctx, validSecret, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create secret for valid credentials: %v", err) - } - - controller := &EBSVolumeTagsController{ - commonClient: &clients.Clients{KubeClient: fakeCoreClient}, - } - - awsRegion := "us-east-1" - ec2Client, err := controller.getEC2Client(ctx, awsRegion) - if err != nil { - t.Fatalf("Expected EC2 client to be created without errors for valid credentials, but got: %v", err) - } - if ec2Client == nil { - t.Fatalf("Expected non-nil EC2 client, but got nil") - } - - // Case 2: Missing credentials - invalidSecret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: awsEBSSecretName, - Namespace: awsEBSSecretNamespace, - }, - Data: map[string][]byte{ - "some_other_field": []byte("some-value"), - }, - } - _, err = fakeCoreClient.CoreV1().Secrets(awsEBSSecretNamespace).Update(ctx, invalidSecret, metav1.UpdateOptions{}) - if err != nil { - t.Fatalf("Failed to create secret for invalid credentials: %v", err) - } - - _, err = controller.getEC2Client(ctx, awsRegion) - if err == nil { - t.Fatalf("Expected error for missing AWS credentials, but got none") - } - }) -} - // TestNewAndUpdatedTags checks that newAndUpdatedTags converts OpenShift AWS resource tags to AWS ec2.Tags correctly func TestNewAndUpdatedTags(t *testing.T) { tests := []struct { diff --git a/pkg/driver/aws-ebs/aws_ebs_tags_retry_worker.go b/pkg/driver/aws-ebs/aws_ebs_tags_retry_worker.go index 188c95b40..9dff7138f 100644 --- a/pkg/driver/aws-ebs/aws_ebs_tags_retry_worker.go +++ b/pkg/driver/aws-ebs/aws_ebs_tags_retry_worker.go @@ -3,6 +3,8 @@ package aws_ebs import ( "context" "errors" + configv1 "github.com/openshift/api/config/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -16,86 +18,98 @@ func (c *EBSVolumeTagsController) startFailedQueueWorker(ctx context.Context, sy select { case <-ctx.Done(): klog.Infof("Context canceled, stopping failed queue worker for EBS Volume Tags") - return errors.New("context canceled, stopping failed queue worker for EBS Volume Tags") // Stop the goroutine when the context is canceled + return errors.New("context canceled, stopping failed queue worker for EBS Volume Tags") default: - // Get the next failed volume from the queue item, quit := c.failedQueue.Get() if quit { klog.Infof("Failed queue worker is shutting down") return errors.New("failed queue worker is shutting down") } + c.processFailedVolume(ctx, item) + } + } +} + +// processFailedVolume processes a single failed volume from the queue +func (c *EBSVolumeTagsController) processFailedVolume(ctx context.Context, pvName string) { + defer c.failedQueue.Done(pvName) + + klog.Infof("Retrying failed volume: %v", pvName) + + infra, err := c.getInfrastructure() + if err != nil { + klog.Errorf("Failed to get infrastructure object: %v", err) + c.failedQueue.AddRateLimited(pvName) + return + } + if infra.Status.PlatformStatus == nil || infra.Status.PlatformStatus.AWS == nil || len(infra.Status.PlatformStatus.AWS.Region) == 0 { + klog.Infof("Skipping failed volume %v because no AWS region defined", pvName) + c.failedQueue.AddRateLimited(pvName) + return + } - func(obj interface{}) { - defer c.failedQueue.Done(obj) - - pvName, ok := obj.(string) - if !ok { - klog.Errorf("Invalid volume name type in failed queue, skipping") - c.failedQueue.Forget(obj) // Remove invalid object from the queue - return - } - - klog.Infof("Retrying failed volume: %v", pvName) - - // Get the infrastructure and EC2 client - infra, err := c.getInfrastructure(ctx) - if err != nil { - klog.Errorf("Failed to get infrastructure for retry: %v", err) - c.failedQueue.AddRateLimited(pvName) // Retry the failed volume again with backoff - return - } - var infraRegion = "" - if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil { - infraRegion = infra.Status.PlatformStatus.AWS.Region - } - ec2Client, err := c.getEC2Client(ctx, infraRegion) - if err != nil { - klog.Errorf("Failed to get EC2 client for retry: %v", err) - c.failedQueue.AddRateLimited(pvName) // Retry the failed volume again with backoff - return - } - - // Compute the new tag hash for comparison - newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags) - - // Retrieve the PV associated with the volume name - pv, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().Get(pvName) - if err != nil { - klog.Errorf("Failed to retrieve PV for volume %s: %v", pvName, err) - c.failedQueue.AddRateLimited(pvName) - return - } - - // Get existing tag hash from PV annotations - existingHash := getPVTagHash(pv) - - // Check if tags need to be updated by comparing hashes - if existingHash == "" || existingHash != newTagsHash { - // Update EBS tags on AWS for this volume - err = c.updateEBSTags([]*v1.PersistentVolume{pv}, ec2Client, infra.Status.PlatformStatus.AWS.ResourceTags) - if err != nil { - c.handleTagUpdateFailure([]*v1.PersistentVolume{pv}, err) - return - } - - // After successful update, store the new hash in the PV annotations - setPVTagHash(pv, newTagsHash) - - // Update the PV with the new annotation - err = c.updateVolumeWithRetry(ctx, pv) - if err != nil { - klog.Errorf("Error updating PV annotations for volume %s: %v", pvName, err) - c.failedQueue.AddRateLimited(pvName) // Retry updating annotation if update fails - return - } - - klog.Infof("Successfully updated PV annotations for volume %s", pvName) - } else { - klog.Infof("No update needed for volume %s as hashes match", pvName) - } - c.failedQueue.Forget(pvName) - - }(item) + pv, err := c.getPersistentVolume(pvName) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("Skipping failed volume %v because it does not exist", pvName) + c.failedQueue.Forget(pvName) + return } + klog.Errorf("Failed to get persistent volume %v: %v", pvName, err) + c.failedQueue.AddRateLimited(pvName) + return + } + + if c.needsTagUpdate(infra, pv) { + c.updateTags(ctx, pv, infra.Status.PlatformStatus.AWS.Region, infra.Status.PlatformStatus.AWS.ResourceTags) + } else { + klog.Infof("No update needed for volume %s as hashes match", pvName) + c.failedQueue.Forget(pvName) + } +} + +// retrievePersistentVolume retrieves the PersistentVolume object by its name +func (c *EBSVolumeTagsController) getPersistentVolume(pvName string) (*v1.PersistentVolume, error) { + pv, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().Get(pvName) + if err != nil { + klog.Errorf("Failed to retrieve PV for volume %s: %v", pvName, err) + return nil, err + } + return pv, nil +} + +// needsTagUpdate checks if the PersistentVolume tags need to be updated +func (c *EBSVolumeTagsController) needsTagUpdate(infra *configv1.Infrastructure, pv *v1.PersistentVolume) bool { + existingHash := getPVTagHash(pv) + newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags) + return existingHash == "" || existingHash != newTagsHash +} + +// updateTags updates the EBS tags on AWS and the PersistentVolume annotations +func (c *EBSVolumeTagsController) updateTags(ctx context.Context, pv *v1.PersistentVolume, region string, resourceTags []configv1.AWSResourceTag) { + ec2Client, err := c.getEC2Client(ctx, region) + if err != nil { + klog.Errorf("Failed to get EC2 client for retry: %v", err) + c.failedQueue.AddRateLimited(pv.Name) + return } + + err = c.updateEBSTags(ctx, []*v1.PersistentVolume{pv}, ec2Client, resourceTags) + if err != nil { + c.handleTagUpdateFailure([]*v1.PersistentVolume{pv}, err) + return + } + + newTagsHash := computeTagsHash(resourceTags) + updatedVolume := setPVTagHash(pv, newTagsHash) + + err = c.updateVolume(ctx, updatedVolume) + if err != nil { + klog.Errorf("Error updating PV annotations for volume %s: %v", pv.Name, err) + c.failedQueue.AddRateLimited(pv.Name) + return + } + + klog.Infof("Successfully updated PV annotations for volume %s", pv.Name) + c.failedQueue.Forget(pv.Name) }