Skip to content

Commit

Permalink
Added Review nits
Browse files Browse the repository at this point in the history
  • Loading branch information
anirudhAgniRedhat committed Dec 16, 2024
1 parent b9746ee commit 3d1a4e2
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 272 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/openshift/library-go v0.0.0-20241120135057-fc703a7407c9
github.com/prometheus/client_golang v1.19.1
github.com/spf13/cobra v1.8.1
golang.org/x/time v0.5.0
gopkg.in/ini.v1 v1.67.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.31.1
Expand Down Expand Up @@ -108,7 +109,6 @@ require (
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240709173604-40e1e62336c5 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/aws-ebs/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func GetAWSEBSOperatorControllerConfig(ctx context.Context, flavour generator.Cl
cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, ctrl)
}

volumeTagController := NewEBSVolumeTagsController(ctx, cfg.GetControllerName("EBSVolumeTagsController"), c, c.EventRecorder)
volumeTagController := NewEBSVolumeTagsController(cfg.GetControllerName("EBSVolumeTagsController"), c, c.EventRecorder)
cfg.ExtraControlPlaneControllers = append(cfg.ExtraControlPlaneControllers, volumeTagController)
cfg.DeploymentInformers = append(cfg.DeploymentInformers, c.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Informer())

Expand Down
223 changes: 101 additions & 122 deletions pkg/driver/aws-ebs/aws_ebs_tags_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"golang.org/x/time/rate"
"os"
"sort"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand All @@ -34,30 +34,30 @@ const (
driverName = "ebs.csi.aws.com"
tagHashAnnotationKey = "ebs.openshift.io/volume-tags-hash"
batchSize = 50

operationDelay = 2 * time.Second
operationBackoffFactor = 1.2
operationRetryCount = 5
)

type EBSVolumeTagsController struct {
name string
commonClient *clients.Clients
eventRecorder events.Recorder
failedQueue workqueue.RateLimitingInterface
failedQueue workqueue.TypedRateLimitingInterface[string]
rateLimiter *rate.Limiter
}

func NewEBSVolumeTagsController(
ctx context.Context,
name string,
commonClient *clients.Clients,
eventRecorder events.Recorder) factory.Controller {

// 10 qps, 100 bucket size.
rateLimiter := rate.NewLimiter(rate.Limit(10), 100)

c := &EBSVolumeTagsController{
name: name,
commonClient: commonClient,
eventRecorder: eventRecorder,
failedQueue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Hour)),
failedQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.NewTypedItemExponentialFailureRateLimiter[string](10*time.Second, 100*time.Hour)),
rateLimiter: rateLimiter,
}
return factory.New().WithSync(
c.Sync,
Expand All @@ -83,19 +83,14 @@ func (c *EBSVolumeTagsController) Sync(ctx context.Context, syncCtx factory.Sync
return nil
}

infra, err := c.getInfrastructure(ctx)
infra, err := c.getInfrastructure()
if err != nil {
return err
}
var infraRegion = ""
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil {
infraRegion = infra.Status.PlatformStatus.AWS.Region
}
ec2Client, err := c.getEC2Client(ctx, infraRegion)
if err != nil {
return err
if infra == nil {
return nil
}
err = c.processInfrastructure(ctx, infra, ec2Client)
err = c.processInfrastructure(ctx, infra)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,57 +184,30 @@ func writeCredentialsToTempFile(data []byte) (string, error) {
}

// getInfrastructure retrieves the Infrastructure resource in OpenShift
func (c *EBSVolumeTagsController) getInfrastructure(ctx context.Context) (*configv1.Infrastructure, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
infra := &configv1.Infrastructure{}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
infra, apiError = c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName)
if apiError != nil {
klog.Errorf("error listing infrastructures objects: %v", apiError)
return false, nil
}
if infra != nil {
return true, nil
}
return false, nil
})
func (c *EBSVolumeTagsController) getInfrastructure() (*configv1.Infrastructure, error) {
infra, err := c.commonClient.ConfigInformers.Config().V1().Infrastructures().Lister().Get(infrastructureName)
if err != nil {
klog.Errorf("error listing infrastructures objects: %v", err)
return nil, err
}
return infra, err
}

func (c *EBSVolumeTagsController) getEBSCloudCredSecret(ctx context.Context) (*v1.Secret, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
var awsCreds *v1.Secret
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
awsCreds, apiError = c.commonClient.KubeClient.CoreV1().Secrets(awsEBSSecretNamespace).Get(ctx, awsEBSSecretName, metav1.GetOptions{})
if apiError != nil {
klog.Errorf("error getting secret object: %v", apiError)
return false, nil
}
if awsCreds != nil {
return true, nil
}
return false, nil
})
return awsCreds, err
awsCreds, err := c.commonClient.KubeInformers.InformersFor(awsEBSSecretNamespace).Core().V1().Secrets().Lister().Secrets(awsEBSSecretNamespace).Get(awsEBSSecretName)
if err != nil {
klog.Errorf("error getting secret object: %v", err)
return nil, err
}
return awsCreds, nil

}

// processInfrastructure processes the Infrastructure resource and updates EBS tags
func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure, ec2Client *ec2.EC2) error {
func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, infra *configv1.Infrastructure) error {
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil &&
infra.Status.PlatformStatus.AWS.ResourceTags != nil {
awsInfra := infra.Status.PlatformStatus.AWS
err := c.fetchPVsAndUpdateTags(ctx, awsInfra.ResourceTags, ec2Client)
err := c.fetchPVsAndUpdateTags(ctx, infra)
if err != nil {
klog.Errorf("Error processing PVs for infrastructure update: %v", err)
return err
Expand All @@ -249,21 +217,30 @@ func (c *EBSVolumeTagsController) processInfrastructure(ctx context.Context, inf
}

// fetchPVsAndUpdateTags retrieves all PVs and updates the AWS EBS tags in batches of 100
func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, resourceTags []configv1.AWSResourceTag, ec2Client *ec2.EC2) error {
pvs, err := c.listPersistentVolumesWithRetry(ctx)
func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, infra *configv1.Infrastructure) error {
pvs, err := c.listPersistentVolumes()
if err != nil {
return fmt.Errorf("error fetching PVs: %v", err)
}
// Compute the hash for the new set of tags
newTagsHash := computeTagsHash(resourceTags)
pvsToBeUpdated := filterUpdatableVolumes(pvs, newTagsHash)
newTagsHash := computeTagsHash(infra.Status.PlatformStatus.AWS.ResourceTags)
pvsToBeUpdated := c.filterUpdatableVolumes(pvs, newTagsHash)

// If there are no volumes to update, return early
if len(pvsToBeUpdated) == 0 {
klog.Infof("No volume tags to update as hashes are unchanged")
return nil
}

var infraRegion = ""
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil {
infraRegion = infra.Status.PlatformStatus.AWS.Region
}
ec2Client, err := c.getEC2Client(ctx, infraRegion)
if err != nil {
return err
}

// Process the volumes in batches
for i := 0; i < len(pvsToBeUpdated); i += batchSize {
end := i + batchSize
Expand All @@ -273,7 +250,7 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res
batch := pvsToBeUpdated[i:end]

// Update tags on AWS EBS volumes
err = c.updateEBSTags(batch, ec2Client, resourceTags)
err = c.updateEBSTags(ctx, batch, ec2Client, infra.Status.PlatformStatus.AWS.ResourceTags)
if err != nil {
c.handleTagUpdateFailure(batch, err)
continue
Expand All @@ -282,10 +259,10 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res
// Update PV annotations after successfully updating the tags in AWS
for _, volume := range batch {
// Set the new tag hash annotation in the PV object
setPVTagHash(volume, newTagsHash)
updatedVolume := setPVTagHash(volume, newTagsHash)

// Update the PV with the new annotations
err = c.updateVolumeWithRetry(ctx, volume)
err = c.updateVolume(ctx, updatedVolume)
if err != nil {
klog.Errorf("Error updating PV annotations for volume %s: %v", volume.Name, err)
c.failedQueue.AddRateLimited(volume.Name) // Retry updating annotation if update fails
Expand All @@ -297,71 +274,57 @@ func (c *EBSVolumeTagsController) fetchPVsAndUpdateTags(ctx context.Context, res
return nil
}

// updateEBSTags updates the tags of an AWS EBS volume
func (c *EBSVolumeTagsController) updateEBSTags(pvBatch []*v1.PersistentVolume, ec2Client *ec2.EC2, resourceTags []configv1.AWSResourceTag) error {

// Merge the existing tags with new resource tags
mergedTags := newAndUpdatedTags(resourceTags)
// updateEBSTags updates the tags of an AWS EBS volume with rate limiting
func (c *EBSVolumeTagsController) updateEBSTags(ctx context.Context, pvBatch []*v1.PersistentVolume, ec2Client *ec2.EC2,
resourceTags []configv1.AWSResourceTag) error {
err := c.rateLimiter.Wait(ctx)
// If context is cancelled or rate limit cannot be acquired, return error
if err != nil {
klog.Errorf("Rate limiter error: %v", err)
return err
}

// Prepare tags
tags := newAndUpdatedTags(resourceTags)
// Create or update the tags
_, err := ec2Client.CreateTags(&ec2.CreateTagsInput{
_, err = ec2Client.CreateTags(&ec2.CreateTagsInput{
Resources: pvsToResourceIDs(pvBatch),
Tags: mergedTags,
Tags: tags,
})

if err != nil {
return fmt.Errorf("error creating tags for volume %v: %v", pvBatch, err)
return err
}
return nil
}

func (c *EBSVolumeTagsController) listPersistentVolumesWithRetry(ctx context.Context) ([]*v1.PersistentVolume, error) {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
var pvList []*v1.PersistentVolume
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
pvList, apiError = c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything())
if apiError != nil {
klog.Errorf("error listing volumes objects: %v", apiError)
return false, nil
}
return true, nil
})
return pvList, err
func (c *EBSVolumeTagsController) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
pvList, err := c.commonClient.KubeInformers.InformersFor("").Core().V1().PersistentVolumes().Lister().List(labels.Everything())
if err != nil {
klog.Errorf("error listing volumes objects: %v", err)
return nil, err
}
return pvList, nil
}

func (c *EBSVolumeTagsController) updateVolumeWithRetry(ctx context.Context, pv *v1.PersistentVolume) error {
backoff := wait.Backoff{
Duration: operationDelay,
Factor: operationBackoffFactor,
Steps: operationRetryCount,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
var apiError error
_, apiError = c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if apiError != nil {
klog.Errorf("error updating volume object %s: %v", pv.Name, apiError)
return false, nil
}
return true, nil
})
return err
func (c *EBSVolumeTagsController) updateVolume(ctx context.Context, pv *v1.PersistentVolume) error {
_, err := c.commonClient.KubeClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error updating volume object %s: %v", pv.Name, err)
return err
}
return nil
}

func (c *EBSVolumeTagsController) handleTagUpdateFailure(batch []*v1.PersistentVolume, error error) {
errorMessage := fmt.Sprintf("Error updating tags for volume %v: %v", batch, error)
func (c *EBSVolumeTagsController) handleTagUpdateFailure(batch []*v1.PersistentVolume, updateErr error) {
for _, pv := range batch {
klog.Errorf("Error updating volume %v tags: %v", pv.Name, errorMessage)
klog.Errorf("Error updating volume %v tags: %v", pv.Name, updateErr)
c.failedQueue.AddRateLimited(pv.Name)
}
var pvNames []string
for _, pv := range batch {
pvNames = append(pvNames, pv.Name)
}
errorMessage := fmt.Sprintf("Error updating tags for volume %v: %v", pvNames, updateErr)
// Emit a warning event for the failure
c.eventRecorder.Warning("EBSVolumeTagsUpdateFailed", fmt.Sprintf("Failed to update tags for batch %v: %v", pvNames, errorMessage))
}
Expand All @@ -379,17 +342,28 @@ func newAndUpdatedTags(resourceTags []configv1.AWSResourceTag) []*ec2.Tag {
return tags
}

func filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume {
var pvsToBeUpdated = make([]*v1.PersistentVolume, 0)
func (c *EBSVolumeTagsController) filterUpdatableVolumes(volumes []*v1.PersistentVolume, newTagsHash string) []*v1.PersistentVolume {
var updatablePVs []*v1.PersistentVolume

for _, volume := range volumes {
if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == driverName {
existingHash := getPVTagHash(volume)
if existingHash == "" || existingHash != newTagsHash {
pvsToBeUpdated = append(pvsToBeUpdated, volume)
}
// Check if the volume is a CSI volume with the correct driver
if volume.Spec.CSI != nil && volume.Spec.CSI.Driver == driverName &&
// Ensure the volume is not already in the failed queue
!c.isVolumeInFailedQueue(volume.Name) &&
// Include volumes whose tag hash is missing or different from the new hash
getPVTagHash(volume) != newTagsHash {

// Add the volume to the list of updatable volumes
updatablePVs = append(updatablePVs, volume)
}
}
return pvsToBeUpdated
return updatablePVs
}

// isVolumeInFailedQueue checks if a volume name is currently in the failed queue
func (c *EBSVolumeTagsController) isVolumeInFailedQueue(volumeName string) bool {
// Check if the volume name is in the failed queue
return c.failedQueue.NumRequeues(volumeName) > 0
}

func pvsToResourceIDs(volumes []*v1.PersistentVolume) []*string {
Expand All @@ -401,14 +375,19 @@ func pvsToResourceIDs(volumes []*v1.PersistentVolume) []*string {
}

// setPVTagHash stores the hash in the PV annotations.
func setPVTagHash(pv *v1.PersistentVolume, hash string) {
func setPVTagHash(pv *v1.PersistentVolume, hash string) *v1.PersistentVolume {
// Create a deep copy of the PersistentVolume to avoid modifying the cached object
pvCopy := pv.DeepCopy()

// Ensure the PV has an annotations map
if pv.Annotations == nil {
pv.Annotations = make(map[string]string)
if pvCopy.Annotations == nil {
pvCopy.Annotations = make(map[string]string)
}

// Set or update the tag hash annotation
pv.Annotations[tagHashAnnotationKey] = hash
pvCopy.Annotations[tagHashAnnotationKey] = hash

return pvCopy
}

// getPVTagHash gets the hash stored in the PV annotations.
Expand Down
Loading

0 comments on commit 3d1a4e2

Please sign in to comment.