Skip to content

Commit

Permalink
Clean up managed resources when disabled (#255)
Browse files Browse the repository at this point in the history
* Reconciler now removes un-used managed resources for CWA collector
  • Loading branch information
okankoAMZ authored Oct 24, 2024
1 parent 313843f commit 95f77b0
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 6 deletions.
79 changes: 78 additions & 1 deletion controllers/amazoncloudwatchagent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/config"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/manifestutils"
collectorStatus "github.com/aws/amazon-cloudwatch-agent-operator/internal/status/collector"
)

Expand All @@ -40,6 +43,79 @@ type Params struct {
Config config.Config
}

func (r *AmazonCloudWatchAgentReconciler) findCloudWatchAgentOwnedObjects(ctx context.Context, owner v1alpha1.AmazonCloudWatchAgent) (map[types.UID]client.Object, error) {
// Define a map to store the owned objects
ownedObjects := make(map[types.UID]client.Object)
selector := manifestutils.SelectorLabelsForAllOperatorManaged(owner.ObjectMeta)
listOps := &client.ListOptions{
Namespace: owner.Namespace,
LabelSelector: labels.SelectorFromSet(selector),
}
// Define lists for different Kubernetes resources
configMapList := &corev1.ConfigMapList{}
serviceList := &corev1.ServiceList{}
serviceAccountList := &corev1.ServiceAccountList{}
deploymentList := &appsv1.DeploymentList{}
statefulSetList := &appsv1.StatefulSetList{}
daemonSetList := &appsv1.DaemonSetList{}
var err error

// List ConfigMaps
err = r.List(ctx, configMapList, listOps)
if err != nil {
return nil, err
}
for i := range configMapList.Items {
ownedObjects[configMapList.Items[i].GetUID()] = &configMapList.Items[i]
}

// List Services
err = r.List(ctx, serviceList, listOps)
if err != nil {
return nil, err
}
for i := range serviceList.Items {
ownedObjects[serviceList.Items[i].GetUID()] = &serviceList.Items[i]
}
// List ServiceAccounts
err = r.List(ctx, serviceAccountList, listOps)
if err != nil {
return nil, err
}
for i := range serviceAccountList.Items {
ownedObjects[serviceAccountList.Items[i].GetUID()] = &serviceAccountList.Items[i]
}

// List Deployments
err = r.List(ctx, deploymentList, listOps)
if err != nil {
return nil, err
}
for i := range deploymentList.Items {
ownedObjects[deploymentList.Items[i].GetUID()] = &deploymentList.Items[i]
}

// List StatefulSets
err = r.List(ctx, statefulSetList, listOps)
if err != nil {
return nil, err
}
for i := range statefulSetList.Items {
ownedObjects[statefulSetList.Items[i].GetUID()] = &statefulSetList.Items[i]
}

// List DaemonSets
err = r.List(ctx, daemonSetList, listOps)
if err != nil {
return nil, err
}
for i := range daemonSetList.Items {
ownedObjects[daemonSetList.Items[i].GetUID()] = &daemonSetList.Items[i]
}

return ownedObjects, nil

}
func (r *AmazonCloudWatchAgentReconciler) getParams(instance v1alpha1.AmazonCloudWatchAgent) manifests.Params {
return manifests.Params{
Config: r.config,
Expand Down Expand Up @@ -108,7 +184,8 @@ func (r *AmazonCloudWatchAgentReconciler) Reconcile(ctx context.Context, req ctr
if buildErr != nil {
return ctrl.Result{}, buildErr
}
err := reconcileDesiredObjects(ctx, r.Client, log, &params.OtelCol, params.Scheme, desiredObjects...)

err := reconcileDesiredObjectsWPrune(ctx, r.Client, log, params.OtelCol, params.Scheme, desiredObjects, r.findCloudWatchAgentOwnedObjects)
return collectorStatus.HandleReconcileStatus(ctx, log, params, err)
}

Expand Down
65 changes: 60 additions & 5 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -56,10 +57,12 @@ func BuildCollector(params manifests.Params) ([]client.Object, error) {
}
return resources, nil
}

// reconcileDesiredObjects runs the reconcile process using the mutateFn over the given list of objects.
func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, owner metav1.Object, scheme *runtime.Scheme, desiredObjects ...client.Object) error {
func reconcileDesiredObjectUIDs(ctx context.Context, kubeClient client.Client, logger logr.Logger,
owner metav1.Object, scheme *runtime.Scheme, desiredObjects ...client.Object) (map[types.UID]client.Object, error) {
var errs []error
existingObjectMap := make(map[types.UID]client.Object)
var existingObjectList []client.Object

for _, desired := range desiredObjects {
l := logger.WithValues(
"object_name", desired.GetName(),
Expand All @@ -76,6 +79,8 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
// existing is an object the controller runtime will hydrate for us
// we obtain the existing object by deep copying the desired object because it's the most convenient way
existing := desired.DeepCopyObject().(client.Object)
existingObjectList = append(existingObjectList, existing) //uid are not assigned yet

mutateFn := manifests.MutateFuncFor(existing, desired)
var op controllerutil.OperationResult
crudErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand All @@ -87,7 +92,7 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
l.Error(crudErr, "detected immutable field change, trying to delete, new object will be created on next reconcile", "existing", existing.GetName())
delErr := kubeClient.Delete(ctx, existing)
if delErr != nil {
return delErr
return nil, delErr
}
continue
} else if crudErr != nil {
Expand All @@ -99,11 +104,61 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
l.V(1).Info(fmt.Sprintf("desired has been %s", op))
}
if len(errs) > 0 {
return fmt.Errorf("failed to create objects for %s: %w", owner.GetName(), errors.Join(errs...))
return nil, fmt.Errorf("failed to create objects for %s: %w", owner.GetName(), errors.Join(errs...))
}
for _, obj := range existingObjectList {
existingObjectMap[obj.GetUID()] = obj
}
return existingObjectMap, nil
}

func reconcileDesiredObjectsWPrune(ctx context.Context, kubeClient client.Client, logger logr.Logger, owner v1alpha1.AmazonCloudWatchAgent, scheme *runtime.Scheme,
desiredObjects []client.Object,
searchOwnedObjectsFunc func(ctx context.Context, owner v1alpha1.AmazonCloudWatchAgent) (map[types.UID]client.Object, error),
) error {
previouslyOwnedObjects, err := searchOwnedObjectsFunc(ctx, owner)
if err != nil {
return fmt.Errorf("failed to search owned objects: %w", err)
}

desiredObjectMap, err := reconcileDesiredObjectUIDs(ctx, kubeClient, logger, &owner, scheme, desiredObjects...)

// Pruning owned objects in the cluster which are not should not be present after the reconciliation.
err = pruneStaleObjects(ctx, kubeClient, logger, previouslyOwnedObjects, desiredObjectMap)
if err != nil {
return fmt.Errorf("failed to prune objects for %s: %w", owner.GetName(), err)
}
return nil
}

// reconcileDesiredObjects runs the reconcile process using the mutateFn over the given list of objects.
func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, owner metav1.Object, scheme *runtime.Scheme, desiredObjects ...client.Object) error {
_, err := reconcileDesiredObjectUIDs(ctx, kubeClient, logger, owner, scheme, desiredObjects...)
return err
}

func pruneStaleObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, previouslyOwnedMap, desiredMap map[types.UID]client.Object) error {
// Pruning owned objects in the cluster which should not be present after the reconciliation.
var pruneErrs []error
for uid, obj := range previouslyOwnedMap {
l := logger.WithValues(
"object_name", obj.GetName(),
"object_kind", obj.GetObjectKind().GroupVersionKind().Kind,
)
if _, found := desiredMap[uid]; found {
continue
}

l.Info("pruning unmanaged resource")
err := kubeClient.Delete(ctx, obj)
if err != nil {
l.Error(err, "failed to delete resource")
pruneErrs = append(pruneErrs, err)
}
}
return errors.Join(pruneErrs...)
}

func enabledAcceleratedComputeByAgentConfig(ctx context.Context, c client.Client, log logr.Logger) bool {
agentResource := getAmazonCloudWatchAgentResource(ctx, c)
// missing feature flag means it's on by default
Expand Down
8 changes: 8 additions & 0 deletions internal/manifests/manifestutils/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ func SelectorLabels(instance metav1.ObjectMeta, component string) map[string]str
"app.kubernetes.io/component": component,
}
}

func SelectorLabelsForAllOperatorManaged(instance metav1.ObjectMeta) map[string]string {
return map[string]string{
"app.kubernetes.io/managed-by": "amazon-cloudwatch-agent-operator",
"app.kubernetes.io/instance": naming.Truncate("%s.%s", 63, instance.Namespace, instance.Name),
"app.kubernetes.io/part-of": "amazon-cloudwatch-agent",
}
}

0 comments on commit 95f77b0

Please sign in to comment.