From 10ebedeaf27d86e46a7d412c86bf74fe256cd755 Mon Sep 17 00:00:00 2001
From: nb-ohad <mitrani.ohad@gmail.com>
Date: Mon, 29 Jul 2024 18:13:51 +0300
Subject: [PATCH 1/4] ClientProfile Controller: Rename controller to match with
 new naming scheme

Signed-off-by: nb-ohad <mitrani.ohad@gmail.com>
---
 cmd/main.go                                      |  4 ++--
 config/rbac/role.yaml                            |  6 +++---
 ...controller.go => clientprofile_controller.go} | 16 ++++++++--------
 ..._test.go => clientprofile_controller_test.go} | 12 ++++++------
 4 files changed, 19 insertions(+), 19 deletions(-)
 rename internal/controller/{config_controller.go => clientprofile_controller.go} (70%)
 rename internal/controller/{config_controller_test.go => clientprofile_controller_test.go} (86%)

diff --git a/cmd/main.go b/cmd/main.go
index 0eda4bfb..97228a5e 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -129,11 +129,11 @@ func main() {
 		setupLog.Error(err, "unable to create controller", "controller", "Driver")
 		os.Exit(1)
 	}
-	if err = (&controller.ConfigReconciler{
+	if err = (&controller.ClientProfileReconciler{
 		Client: mgr.GetClient(),
 		Scheme: mgr.GetScheme(),
 	}).SetupWithManager(mgr); err != nil {
-		setupLog.Error(err, "unable to create controller", "controller", "Config")
+		setupLog.Error(err, "unable to create controller", "controller", "ClientProfile")
 		os.Exit(1)
 	}
 	if err = (&controller.ClientProfileMappingReconciler{
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 85792978..d50a81c3 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -76,7 +76,7 @@ rules:
 - apiGroups:
   - csi.ceph.io
   resources:
-  - configs
+  - clientprofiles
   verbs:
   - create
   - delete
@@ -88,13 +88,13 @@ rules:
 - apiGroups:
   - csi.ceph.io
   resources:
-  - configs/finalizers
+  - clientprofiles/finalizers
   verbs:
   - update
 - apiGroups:
   - csi.ceph.io
   resources:
-  - configs/status
+  - clientprofiles/status
   verbs:
   - get
   - patch
diff --git a/internal/controller/config_controller.go b/internal/controller/clientprofile_controller.go
similarity index 70%
rename from internal/controller/config_controller.go
rename to internal/controller/clientprofile_controller.go
index f90a1010..503b42e2 100644
--- a/internal/controller/config_controller.go
+++ b/internal/controller/clientprofile_controller.go
@@ -27,26 +27,26 @@ import (
 	csiv1alpha1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
 )
 
-// ConfigReconciler reconciles a Config object
-type ConfigReconciler struct {
+// ClientProfileReconciler reconciles a ClientProfile object
+type ClientProfileReconciler struct {
 	client.Client
 	Scheme *runtime.Scheme
 }
 
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=configs,verbs=get;list;watch;create;update;patch;delete
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=configs/status,verbs=get;update;patch
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=configs/finalizers,verbs=update
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/finalizers,verbs=update
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
 // TODO(user): Modify the Reconcile function to compare the state specified by
-// the Config object against the actual cluster state, and then
+// the ClientProfile object against the actual cluster state, and then
 // perform operations to make the cluster state reflect the state specified by
 // the user.
 //
 // For more details, check Reconcile and its Result here:
 // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
-func (r *ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+func (r *ClientProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 	_ = log.FromContext(ctx)
 
 	// TODO(user): your logic here
@@ -55,7 +55,7 @@ func (r *ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
 }
 
 // SetupWithManager sets up the controller with the Manager.
-func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
+func (r *ClientProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
 	return ctrl.NewControllerManagedBy(mgr).
 		For(&csiv1alpha1.ClientProfile{}).
 		Complete(r)
diff --git a/internal/controller/config_controller_test.go b/internal/controller/clientprofile_controller_test.go
similarity index 86%
rename from internal/controller/config_controller_test.go
rename to internal/controller/clientprofile_controller_test.go
index fc945916..5c762600 100644
--- a/internal/controller/config_controller_test.go
+++ b/internal/controller/clientprofile_controller_test.go
@@ -30,7 +30,7 @@ import (
 	csiv1alpha1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
 )
 
-var _ = Describe("Config Controller", func() {
+var _ = Describe("ClientProfile Controller", func() {
 	Context("When reconciling a resource", func() {
 		const resourceName = "test-resource"
 
@@ -40,11 +40,11 @@ var _ = Describe("Config Controller", func() {
 			Name:      resourceName,
 			Namespace: "default", // TODO(user):Modify as needed
 		}
-		config := &csiv1alpha1.ClientProfile{}
+		clientProfile := &csiv1alpha1.ClientProfile{}
 
 		BeforeEach(func() {
-			By("creating the custom resource for the Kind Config")
-			err := k8sClient.Get(ctx, typeNamespacedName, config)
+			By("creating the custom resource for the Kind ClientProfile")
+			err := k8sClient.Get(ctx, typeNamespacedName, clientProfile)
 			if err != nil && errors.IsNotFound(err) {
 				resource := &csiv1alpha1.ClientProfile{
 					ObjectMeta: metav1.ObjectMeta{
@@ -63,12 +63,12 @@ var _ = Describe("Config Controller", func() {
 			err := k8sClient.Get(ctx, typeNamespacedName, resource)
 			Expect(err).NotTo(HaveOccurred())
 
-			By("Cleanup the specific resource instance Config")
+			By("Cleanup the specific resource instance ClientProfile")
 			Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
 		})
 		It("should successfully reconcile the resource", func() {
 			By("Reconciling the created resource")
-			controllerReconciler := &ConfigReconciler{
+			controllerReconciler := &ClientProfileReconciler{
 				Client: k8sClient,
 				Scheme: k8sClient.Scheme(),
 			}

From 8cafb64b422485bd56fa36b88178442501d3575c Mon Sep 17 00:00:00 2001
From: nb-ohad <mitrani.ohad@gmail.com>
Date: Thu, 18 Jul 2024 22:13:16 +0300
Subject: [PATCH 2/4] ClientProfile Controller: Load and validate client
 profile object and related ceph connection

Signed-off-by: nb-ohad <mitrani.ohad@gmail.com>
---
 config/rbac/role.yaml                         |   9 ++
 .../controller/clientprofile_controller.go    | 119 ++++++++++++++----
 2 files changed, 107 insertions(+), 21 deletions(-)

diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index d50a81c3..4760133e 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -47,6 +47,15 @@ rules:
   - patch
   - update
   - watch
+- apiGroups:
+  - csi.ceph.io
+  resources:
+  - cephconnections
+  verbs:
+  - get
+  - list
+  - update
+  - watch
 - apiGroups:
   - csi.ceph.io
   resources:
diff --git a/internal/controller/clientprofile_controller.go b/internal/controller/clientprofile_controller.go
index 503b42e2..b1438833 100644
--- a/internal/controller/clientprofile_controller.go
+++ b/internal/controller/clientprofile_controller.go
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,45 +18,122 @@ package controller
 
 import (
 	"context"
+	"fmt"
 
+	"github.com/go-logr/logr"
 	"k8s.io/apimachinery/pkg/runtime"
 	ctrl "sigs.k8s.io/controller-runtime"
+	"sigs.k8s.io/controller-runtime/pkg/builder"
 	"sigs.k8s.io/controller-runtime/pkg/client"
-	"sigs.k8s.io/controller-runtime/pkg/log"
+	ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+	ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
+	"sigs.k8s.io/controller-runtime/pkg/predicate"
 
-	csiv1alpha1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
+	csiv1a1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
 )
 
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/finalizers,verbs=update
+//+kubebuilder:rbac:groups=csi.ceph.io,resources=cephconnections,verbs=get;list;watch;update
+
 // ClientProfileReconciler reconciles a ClientProfile object
 type ClientProfileReconciler struct {
 	client.Client
 	Scheme *runtime.Scheme
 }
 
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles,verbs=get;list;watch;create;update;patch;delete
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/status,verbs=get;update;patch
-//+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/finalizers,verbs=update
+// A local reconcile object tied to a single reconcile iteration
+type ClientProfileReconcile struct {
+	ClientProfileReconciler
+
+	ctx           context.Context
+	log           logr.Logger
+	clientProfile csiv1a1.ClientProfile
+	cephConn      csiv1a1.CephConnection
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *ClientProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
+	// Filter update events based on metadata.generation changes, will filter events
+	// for non-spec changes on most resource types.
+	genChangedPredicate := predicate.GenerationChangedPredicate{}
+
+	return ctrl.NewControllerManagedBy(mgr).
+		For(&csiv1a1.ClientProfile{}).
+		Owns(
+			&csiv1a1.CephConnection{},
+			builder.MatchEveryOwner,
+			builder.WithPredicates(genChangedPredicate),
+		).
+		Complete(r)
+}
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
-// TODO(user): Modify the Reconcile function to compare the state specified by
-// the ClientProfile object against the actual cluster state, and then
-// perform operations to make the cluster state reflect the state specified by
-// the user.
-//
-// For more details, check Reconcile and its Result here:
-// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
 func (r *ClientProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
-	_ = log.FromContext(ctx)
+	log := ctrllog.FromContext(ctx)
+	log.Info("Starting reconcile iteration for ClientProfile", "req", req)
 
-	// TODO(user): your logic here
+	reconcileHandler := ClientProfileReconcile{}
+	reconcileHandler.ClientProfileReconciler = *r
+	reconcileHandler.ctx = ctx
+	reconcileHandler.log = log
+	reconcileHandler.clientProfile.Name = req.Name
+	reconcileHandler.clientProfile.Namespace = req.Namespace
 
-	return ctrl.Result{}, nil
+	err := reconcileHandler.reconcile()
+	if err != nil {
+		log.Error(err, "ClientProfile reconciliation failed")
+	} else {
+		log.Info("ClientProfile reconciliation completed successfully")
+	}
+	return ctrl.Result{}, err
 }
 
-// SetupWithManager sets up the controller with the Manager.
-func (r *ClientProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
-	return ctrl.NewControllerManagedBy(mgr).
-		For(&csiv1alpha1.ClientProfile{}).
-		Complete(r)
+func (r *ClientProfileReconcile) reconcile() error {
+	// Load the ClientProfile
+	if err := r.Get(r.ctx, client.ObjectKeyFromObject(&r.clientProfile), &r.clientProfile); err != nil {
+		r.log.Error(err, "Failed loading ClientProfile")
+		return err
+	}
+
+	// Validate a pointer to a ceph connection
+	if r.clientProfile.Spec.CephConnectionRef.Name == "" {
+		err := fmt.Errorf("validation error")
+		r.log.Error(err, "Invalid ClientProfile, missing .spec.cephConnectionRef.name")
+		return err
+	}
+
+	// Load the ceph connection
+	r.cephConn.Name = r.clientProfile.Spec.CephConnectionRef.Name
+	r.cephConn.Namespace = r.clientProfile.Namespace
+	if err := r.Get(r.ctx, client.ObjectKeyFromObject(&r.cephConn), &r.cephConn); err != nil {
+		r.log.Error(err, "Failed loading CephConnection")
+		return err
+	}
+
+	// Ensure the CephConnection has an owner reference (not controller reference)
+	// for the current reconciled ClientProfile
+	connHasOwnerRef := false
+	for i := range r.cephConn.OwnerReferences {
+		ownerRef := &r.cephConn.OwnerReferences[i]
+		if ownerRef.UID == r.clientProfile.UID {
+			connHasOwnerRef = true
+			break
+		}
+	}
+	if !connHasOwnerRef {
+		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &r.cephConn, r.Scheme); err != nil {
+			r.log.Error(err, "Failed adding an owner reference on CephConnection")
+			return err
+		}
+		r.log.Info("Owner reference missing on CephConnection, adding")
+		if err := r.Update(r.ctx, &r.cephConn); err != nil {
+			r.log.Error(err, "Failed adding an owner reference to CephConnection")
+			return err
+		}
+	}
+
+	return nil
 }

From 2a25c9d1e23f6ec0ed7e64c07532bf2eed7e5ff0 Mon Sep 17 00:00:00 2001
From: nb-ohad <mitrani.ohad@gmail.com>
Date: Fri, 19 Jul 2024 01:58:21 +0300
Subject: [PATCH 3/4] ClientProfile Contoller: Create/Update a Ceph CSI config
 entry for ClientProfile resource

Signed-off-by: nb-ohad <mitrani.ohad@gmail.com>
---
 api/v1alpha1/cephconnection_types.go          |   2 +-
 api/v1alpha1/zz_generated.deepcopy.go         |   6 +-
 config/rbac/role.yaml                         |   5 +
 .../controller/clientprofile_controller.go    | 163 +++++++++++++++++-
 internal/utils/csi.go                         |   2 +
 internal/utils/meta.go                        |  15 +-
 6 files changed, 183 insertions(+), 10 deletions(-)

diff --git a/api/v1alpha1/cephconnection_types.go b/api/v1alpha1/cephconnection_types.go
index fc537ac3..26e16cf9 100644
--- a/api/v1alpha1/cephconnection_types.go
+++ b/api/v1alpha1/cephconnection_types.go
@@ -34,7 +34,7 @@ type CephConnectionSpec struct {
 	Monitors []string `json:"monitors"`
 
 	//+kubebuilder:validation:Optional
-	ReadAffinity ReadAffinitySpec `json:"readAffinity,omitempty"`
+	ReadAffinity *ReadAffinitySpec `json:"readAffinity,omitempty"`
 
 	//+kubebuilder:validation:Optional
 	//+kubebuilder:validation:Minimum:=1
diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go
index 0856df76..e5160343 100644
--- a/api/v1alpha1/zz_generated.deepcopy.go
+++ b/api/v1alpha1/zz_generated.deepcopy.go
@@ -125,7 +125,11 @@ func (in *CephConnectionSpec) DeepCopyInto(out *CephConnectionSpec) {
 		*out = make([]string, len(*in))
 		copy(*out, *in)
 	}
-	in.ReadAffinity.DeepCopyInto(&out.ReadAffinity)
+	if in.ReadAffinity != nil {
+		in, out := &in.ReadAffinity, &out.ReadAffinity
+		*out = new(ReadAffinitySpec)
+		(*in).DeepCopyInto(*out)
+	}
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CephConnectionSpec.
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 4760133e..f4a69756 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -9,8 +9,13 @@ rules:
   resources:
   - configmaps
   verbs:
+  - create
+  - delete
   - get
   - list
+  - patch
+  - update
+  - watch
 - apiGroups:
   - ""
   resources:
diff --git a/internal/controller/clientprofile_controller.go b/internal/controller/clientprofile_controller.go
index b1438833..810ef25c 100644
--- a/internal/controller/clientprofile_controller.go
+++ b/internal/controller/clientprofile_controller.go
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
-http://www.apache.org/licenses/LICENSE-2.0
+    http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,13 @@ package controller
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
+	"slices"
+	"sync"
 
 	"github.com/go-logr/logr"
+	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 	ctrl "sigs.k8s.io/controller-runtime"
 	"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -30,12 +34,14 @@ import (
 	"sigs.k8s.io/controller-runtime/pkg/predicate"
 
 	csiv1a1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
+	"github.com/ceph/ceph-csi-operator/internal/utils"
 )
 
 //+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/status,verbs=get;update;patch
 //+kubebuilder:rbac:groups=csi.ceph.io,resources=clientprofiles/finalizers,verbs=update
 //+kubebuilder:rbac:groups=csi.ceph.io,resources=cephconnections,verbs=get;list;watch;update
+//+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
 
 // ClientProfileReconciler reconciles a ClientProfile object
 type ClientProfileReconciler struct {
@@ -53,6 +59,29 @@ type ClientProfileReconcile struct {
 	cephConn      csiv1a1.CephConnection
 }
 
+// csiClusterRrcordInfo represent the structure of a serialized csi record
+// in Ceph CSI's config, configmap
+type csiClusterInfoRecord struct {
+	ClusterId string   `json:"clusterID,omitempty"`
+	Monitors  []string `json:"monitors,omitempty"`
+	CephFs    struct {
+		SubvolumeGroup     string `json:"subvolumeGroup,omitempty"`
+		KernelMountOptions string `json:"kernelMountOptions"`
+		FuseMountOptions   string `json:"fuseMountOptions"`
+	} `json:"cephFS,omitempty"`
+	Rbd struct {
+		RadosNamespace string `json:"radosNamespace,omitempty"`
+		MirrorCount    int    `json:"mirrorCount,omitempty"`
+	} `json:"rbd,omitempty"`
+	Nfs          struct{} `json:"nfs,omitempty"`
+	ReadAffinity struct {
+		Enabled             bool     `json:"enabled,omitempty"`
+		CrushLocationLabels []string `json:"crushLocationLabels,omitempty"`
+	} `json:"readAffinity,omitempty"`
+}
+
+var configMapUpdateLock = sync.Mutex{}
+
 // SetupWithManager sets up the controller with the Manager.
 func (r *ClientProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
 	// Filter update events based on metadata.generation changes, will filter events
@@ -66,11 +95,14 @@ func (r *ClientProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
 			builder.MatchEveryOwner,
 			builder.WithPredicates(genChangedPredicate),
 		).
+		Owns(
+			&corev1.ConfigMap{},
+			builder.MatchEveryOwner,
+			builder.WithPredicates(genChangedPredicate),
+		).
 		Complete(r)
 }
 
-// Reconcile is part of the main kubernetes reconciliation loop which aims to
-// move the current state of the cluster closer to the desired state.
 func (r *ClientProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 	log := ctrllog.FromContext(ctx)
 	log.Info("Starting reconcile iteration for ClientProfile", "req", req)
@@ -92,13 +124,38 @@ func (r *ClientProfileReconciler) Reconcile(ctx context.Context, req ctrl.Reques
 }
 
 func (r *ClientProfileReconcile) reconcile() error {
+	if err := r.loadAndValidate(); err != nil {
+		return err
+	}
+
+	// Ensure the ceph connection resource has an owner reference (not controller reference)
+	// for the current reconciled config resource
+	if !utils.IsOwnedBy(&r.cephConn, &r.clientProfile) {
+		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &r.cephConn, r.Scheme); err != nil {
+			r.log.Error(err, "Failed adding an owner reference on CephConnection")
+			return err
+		}
+		if err := r.Update(r.ctx, &r.cephConn); err != nil {
+			r.log.Error(err, "Failed to update CephConnection")
+			return err
+		}
+	}
+
+	if err := r.reconcileCephCsiClusterInfo(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *ClientProfileReconcile) loadAndValidate() error {
 	// Load the ClientProfile
 	if err := r.Get(r.ctx, client.ObjectKeyFromObject(&r.clientProfile), &r.clientProfile); err != nil {
 		r.log.Error(err, "Failed loading ClientProfile")
 		return err
 	}
 
-	// Validate a pointer to a ceph connection
+	// Validate a pointer to a ceph cluster resource
 	if r.clientProfile.Spec.CephConnectionRef.Name == "" {
 		err := fmt.Errorf("validation error")
 		r.log.Error(err, "Invalid ClientProfile, missing .spec.cephConnectionRef.name")
@@ -115,15 +172,15 @@ func (r *ClientProfileReconcile) reconcile() error {
 
 	// Ensure the CephConnection has an owner reference (not controller reference)
 	// for the current reconciled ClientProfile
-	connHasOwnerRef := false
+	cephConnHasOwnerRef := false
 	for i := range r.cephConn.OwnerReferences {
 		ownerRef := &r.cephConn.OwnerReferences[i]
 		if ownerRef.UID == r.clientProfile.UID {
-			connHasOwnerRef = true
+			cephConnHasOwnerRef = true
 			break
 		}
 	}
-	if !connHasOwnerRef {
+	if !cephConnHasOwnerRef {
 		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &r.cephConn, r.Scheme); err != nil {
 			r.log.Error(err, "Failed adding an owner reference on CephConnection")
 			return err
@@ -137,3 +194,95 @@ func (r *ClientProfileReconcile) reconcile() error {
 
 	return nil
 }
+
+func (r *ClientProfileReconcile) reconcileCephCsiClusterInfo() error {
+	csiConfigMap := corev1.ConfigMap{}
+	csiConfigMap.Name = utils.CsiConfigVolume.Name
+	csiConfigMap.Namespace = r.clientProfile.Namespace
+
+	log := r.log.WithValues("csiConfigMapName", csiConfigMap.Name)
+	log.Info("Reconciling Ceph CSI Cluster Info")
+
+	// Creating the desired record in advance to miimize the amount execution time
+	// the code run while serializing access to the configmap
+	record := composeCsiClusterInfoRecord(&r.clientProfile, &r.cephConn)
+
+	// Using a lock to serialized the updating of the config map.
+	// Although the code will run perfetcly fine without the lock, there will be a higher
+	// chance to fail on the create/update operation because another concurrent reconcile loop
+	// updated the config map which will result in stale representation and an update failure.
+	// The locking strategy will sync all update to the shared config file and will prevent such
+	// potential issues without a big impact on preformace as a whole
+	configMapUpdateLock.Lock()
+	defer configMapUpdateLock.Unlock()
+
+	_, err := ctrlutil.CreateOrUpdate(r.ctx, r.Client, &csiConfigMap, func() error {
+		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &csiConfigMap, r.Scheme); err != nil {
+			log.Error(err, "Failed setting an owner reference on Ceph CSI config map")
+			return err
+		}
+
+		configsAsJson := csiConfigMap.Data[utils.CsiConfigMapConfigKey]
+		clusterInfoList := []*csiClusterInfoRecord{}
+
+		// parse the json serialized list into a go array
+		if configsAsJson != "" {
+			if err := json.Unmarshal([]byte(configsAsJson), &clusterInfoList); err != nil {
+				log.Error(err, "Failed to parse cluster info list under \"config.json\" key")
+				return err
+			}
+		}
+
+		// Locate an existing entry for the same config/cluster name if exists
+		index := slices.IndexFunc(clusterInfoList, func(record *csiClusterInfoRecord) bool {
+			return record.ClusterId == r.clientProfile.Name
+		})
+
+		// overwrite an existing entry or append a new one
+		if index > -1 {
+			clusterInfoList[index] = record
+		} else {
+			clusterInfoList = append(clusterInfoList, record)
+		}
+
+		// Serialize the list and store it back into the config map
+		if bytes, err := json.Marshal(clusterInfoList); err == nil {
+			if csiConfigMap.Data == nil {
+				csiConfigMap.Data = map[string]string{}
+			}
+			csiConfigMap.Data[utils.CsiConfigMapConfigKey] = string(bytes)
+			return nil
+		} else {
+			log.Error(err, "Failed to serialize cluster info list")
+			return err
+		}
+	})
+
+	return err
+}
+
+// ComposeCsiClusterInfoRecord composes the desired csi cluster info record for
+// a given ClientProfile and CephConnection specs
+func composeCsiClusterInfoRecord(clientProfile *csiv1a1.ClientProfile, cephConn *csiv1a1.CephConnection) *csiClusterInfoRecord {
+	record := csiClusterInfoRecord{}
+	record.ClusterId = clientProfile.Name
+	record.Monitors = cephConn.Spec.Monitors
+	if cephFs := clientProfile.Spec.CephFs; cephFs != nil {
+		record.CephFs.SubvolumeGroup = cephFs.SubVolumeGroup
+		if mountOpt := cephFs.KernelMountOptions; mountOpt != nil {
+			record.CephFs.KernelMountOptions = utils.MapToString(mountOpt, "=", ",")
+		}
+		if mountOpt := cephFs.FuseMountOptions; mountOpt != nil {
+			record.CephFs.FuseMountOptions = utils.MapToString(mountOpt, "=", ",")
+		}
+	}
+	if rbd := clientProfile.Spec.Rbd; rbd != nil {
+		record.Rbd.RadosNamespace = rbd.RadosNamespace
+		record.Rbd.MirrorCount = cephConn.Spec.RbdMirrorDaemonCount
+	}
+	if readAffinity := cephConn.Spec.ReadAffinity; readAffinity != nil {
+		record.ReadAffinity.Enabled = true
+		record.ReadAffinity.CrushLocationLabels = readAffinity.CrushLocationLabels
+	}
+	return &record
+}
diff --git a/internal/utils/csi.go b/internal/utils/csi.go
index 5c558255..2135d167 100644
--- a/internal/utils/csi.go
+++ b/internal/utils/csi.go
@@ -33,6 +33,8 @@ const (
 	pluginDirVolumeName      = "plugin-dir"
 	podsMountDirVolumeName   = "pods-mount-dir"
 	pluginMountDirVolumeName = "plugin-mount-dir"
+
+	CsiConfigMapConfigKey = "config.json"
 )
 
 // Ceph CSI common volumes
diff --git a/internal/utils/meta.go b/internal/utils/meta.go
index 0980139b..851c0ea6 100644
--- a/internal/utils/meta.go
+++ b/internal/utils/meta.go
@@ -16,7 +16,9 @@ limitations under the License.
 
 package utils
 
-import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+import (
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
 
 // AddAnnotation adds an annotation to a resource metadata, returns true if added else false
 func AddAnnotation(obj metav1.Object, key string, value string) bool {
@@ -31,3 +33,14 @@ func AddAnnotation(obj metav1.Object, key string, value string) bool {
 	}
 	return false
 }
+
+// IsOwnedBy returns true if the object has an owner ref for the provided owner
+func IsOwnedBy(obj, owner metav1.Object) bool {
+	ownerRefs := obj.GetOwnerReferences()
+	for i := range ownerRefs {
+		if owner.GetUID() == ownerRefs[i].UID {
+			return true
+		}
+	}
+	return false
+}

From 85dda606eab3e86dc6dae3e275b191528977524d Mon Sep 17 00:00:00 2001
From: nb-ohad <mitrani.ohad@gmail.com>
Date: Sun, 21 Jul 2024 23:18:32 +0300
Subject: [PATCH 4/4] ClientProfile Controller: Add ClientProfile deletion and
 cleanup logic

Signed-off-by: nb-ohad <mitrani.ohad@gmail.com>
---
 .../controller/clientprofile_controller.go    | 92 ++++++++++++++-----
 internal/utils/meta.go                        | 23 +++++
 2 files changed, 91 insertions(+), 24 deletions(-)

diff --git a/internal/controller/clientprofile_controller.go b/internal/controller/clientprofile_controller.go
index 810ef25c..581229b4 100644
--- a/internal/controller/clientprofile_controller.go
+++ b/internal/controller/clientprofile_controller.go
@@ -57,6 +57,7 @@ type ClientProfileReconcile struct {
 	log           logr.Logger
 	clientProfile csiv1a1.ClientProfile
 	cephConn      csiv1a1.CephConnection
+	cleanUp       bool
 }
 
 // csiClusterRrcordInfo represent the structure of a serialized csi record
@@ -80,6 +81,10 @@ type csiClusterInfoRecord struct {
 	} `json:"readAffinity,omitempty"`
 }
 
+const (
+	cleanupFinalizer = "csi.ceph.com/cleanup"
+)
+
 var configMapUpdateLock = sync.Mutex{}
 
 // SetupWithManager sets up the controller with the Manager.
@@ -128,23 +133,29 @@ func (r *ClientProfileReconcile) reconcile() error {
 		return err
 	}
 
-	// Ensure the ceph connection resource has an owner reference (not controller reference)
-	// for the current reconciled config resource
-	if !utils.IsOwnedBy(&r.cephConn, &r.clientProfile) {
-		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &r.cephConn, r.Scheme); err != nil {
-			r.log.Error(err, "Failed adding an owner reference on CephConnection")
-			return err
-		}
-		if err := r.Update(r.ctx, &r.cephConn); err != nil {
-			r.log.Error(err, "Failed to update CephConnection")
+	// Ensure a finalizer on the ClientProfile to allow proper clean up
+	if ctrlutil.AddFinalizer(&r.clientProfile, cleanupFinalizer) {
+		if err := r.Update(r.ctx, &r.clientProfile); err != nil {
+			r.log.Error(err, "Failed to add a cleanup finalizer on ClientProfile")
 			return err
 		}
 	}
 
+	if err := r.reconcileCephConnection(); err != nil {
+		return err
+	}
 	if err := r.reconcileCephCsiClusterInfo(); err != nil {
 		return err
 	}
 
+	if r.cleanUp {
+		ctrlutil.RemoveFinalizer(&r.clientProfile, cleanupFinalizer)
+		if err := r.Update(r.ctx, &r.clientProfile); err != nil {
+			r.log.Error(err, "Failed to add a cleanup finalizer on config resource")
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -154,6 +165,7 @@ func (r *ClientProfileReconcile) loadAndValidate() error {
 		r.log.Error(err, "Failed loading ClientProfile")
 		return err
 	}
+	r.cleanUp = r.clientProfile.DeletionTimestamp != nil
 
 	// Validate a pointer to a ceph cluster resource
 	if r.clientProfile.Spec.CephConnectionRef.Name == "" {
@@ -195,6 +207,28 @@ func (r *ClientProfileReconcile) loadAndValidate() error {
 	return nil
 }
 
+func (r *ClientProfileReconcile) reconcileCephConnection() error {
+	log := r.log.WithValues("cephConnectionName", r.cephConn.Name)
+	log.Info("Reconciling CephConnection")
+
+	if needsUpdate, err := utils.ToggleOwnerReference(
+		!r.cleanUp,
+		&r.clientProfile,
+		&r.cephConn,
+		r.Scheme,
+	); err != nil {
+		r.log.Error(err, "Failed to toggle owner reference on CephConnection")
+		return err
+	} else if needsUpdate {
+		if err := r.Update(r.ctx, &r.cephConn); err != nil {
+			r.log.Error(err, "Failed to update CephConnection")
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (r *ClientProfileReconcile) reconcileCephCsiClusterInfo() error {
 	csiConfigMap := corev1.ConfigMap{}
 	csiConfigMap.Name = utils.CsiConfigVolume.Name
@@ -203,10 +237,6 @@ func (r *ClientProfileReconcile) reconcileCephCsiClusterInfo() error {
 	log := r.log.WithValues("csiConfigMapName", csiConfigMap.Name)
 	log.Info("Reconciling Ceph CSI Cluster Info")
 
-	// Creating the desired record in advance to miimize the amount execution time
-	// the code run while serializing access to the configmap
-	record := composeCsiClusterInfoRecord(&r.clientProfile, &r.cephConn)
-
 	// Using a lock to serialized the updating of the config map.
 	// Although the code will run perfetcly fine without the lock, there will be a higher
 	// chance to fail on the create/update operation because another concurrent reconcile loop
@@ -217,8 +247,13 @@ func (r *ClientProfileReconcile) reconcileCephCsiClusterInfo() error {
 	defer configMapUpdateLock.Unlock()
 
 	_, err := ctrlutil.CreateOrUpdate(r.ctx, r.Client, &csiConfigMap, func() error {
-		if err := ctrlutil.SetOwnerReference(&r.clientProfile, &csiConfigMap, r.Scheme); err != nil {
-			log.Error(err, "Failed setting an owner reference on Ceph CSI config map")
+		if _, err := utils.ToggleOwnerReference(
+			!r.cleanUp,
+			&r.clientProfile,
+			&csiConfigMap,
+			r.Scheme,
+		); err != nil {
+			log.Error(err, "Failed toggling owner reference for Ceph CSI config map")
 			return err
 		}
 
@@ -238,23 +273,32 @@ func (r *ClientProfileReconcile) reconcileCephCsiClusterInfo() error {
 			return record.ClusterId == r.clientProfile.Name
 		})
 
-		// overwrite an existing entry or append a new one
-		if index > -1 {
-			clusterInfoList[index] = record
-		} else {
-			clusterInfoList = append(clusterInfoList, record)
+		if !r.cleanUp {
+			// Overwrite an existing entry or append a new one
+			record := composeCsiClusterInfoRecord(&r.clientProfile, &r.cephConn)
+			if index > -1 {
+				clusterInfoList[index] = record
+			} else {
+				clusterInfoList = append(clusterInfoList, record)
+			}
+		} else if index > -1 {
+			// An O(1) unordered in-place delete of a record
+			// Will not shrink the capacity of the slice
+			length := len(clusterInfoList)
+			clusterInfoList[index] = clusterInfoList[length-1]
+			clusterInfoList = clusterInfoList[:length-1]
 		}
 
 		// Serialize the list and store it back into the config map
-		if bytes, err := json.Marshal(clusterInfoList); err == nil {
+		if bytes, err := json.Marshal(clusterInfoList); err != nil {
+			log.Error(err, "Failed to serialize cluster info list")
+			return err
+		} else {
 			if csiConfigMap.Data == nil {
 				csiConfigMap.Data = map[string]string{}
 			}
 			csiConfigMap.Data[utils.CsiConfigMapConfigKey] = string(bytes)
 			return nil
-		} else {
-			log.Error(err, "Failed to serialize cluster info list")
-			return err
 		}
 	})
 
diff --git a/internal/utils/meta.go b/internal/utils/meta.go
index 851c0ea6..6f48e01e 100644
--- a/internal/utils/meta.go
+++ b/internal/utils/meta.go
@@ -18,6 +18,8 @@ package utils
 
 import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 )
 
 // AddAnnotation adds an annotation to a resource metadata, returns true if added else false
@@ -44,3 +46,24 @@ func IsOwnedBy(obj, owner metav1.Object) bool {
 	}
 	return false
 }
+
+// ToggleOwnerReference adds or remove an owner reference for the given owner based on the first argument.
+// The function return true if the owner reference list had changed and false it it didn't
+func ToggleOwnerReference(on bool, obj, owner metav1.Object, scheme *runtime.Scheme) (bool, error) {
+	ownerRefExists := IsOwnedBy(obj, owner)
+
+	if on {
+		if !ownerRefExists {
+			if err := ctrlutil.SetOwnerReference(obj, owner, scheme); err != nil {
+				return false, err
+			}
+			return true, nil
+		}
+	} else if ownerRefExists {
+		if err := ctrlutil.RemoveOwnerReference(obj, owner, scheme); err != nil {
+			return false, err
+		}
+		return true, nil
+	}
+	return false, nil
+}