From f013745e58eed975e39941e289b9a26fab89e14f Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Mon, 9 Dec 2024 13:26:32 +0800 Subject: [PATCH] Add cluster decorator interface in register And refactor creating to controller to call decorators Signed-off-by: Jian Qiu --- .../register/aws_irsa/aws_irsa.go | 6 + pkg/registration/register/csr/csr.go | 6 + pkg/registration/register/interface.go | 3 + .../register/secret_controller_test.go | 8 +- .../spoke/registration/creating_controller.go | 103 ++++++++++-------- .../registration/creating_controller_test.go | 13 ++- pkg/registration/spoke/spokeagent.go | 8 +- 7 files changed, 90 insertions(+), 57 deletions(-) diff --git a/pkg/registration/register/aws_irsa/aws_irsa.go b/pkg/registration/register/aws_irsa/aws_irsa.go index 2ba57c86b..75557a852 100644 --- a/pkg/registration/register/aws_irsa/aws_irsa.go +++ b/pkg/registration/register/aws_irsa/aws_irsa.go @@ -13,6 +13,8 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/klog/v2" + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/ocm/pkg/registration/register" ) @@ -102,6 +104,10 @@ func (c *AWSIRSADriver) IsHubKubeConfigValid(ctx context.Context, secretOption r return true, nil } +func (c *AWSIRSADriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster { + return cluster +} + func NewAWSIRSADriver() register.RegisterDriver { return &AWSIRSADriver{} } diff --git a/pkg/registration/register/csr/csr.go b/pkg/registration/register/csr/csr.go index c43c8fa26..c8fb13f7b 100644 --- a/pkg/registration/register/csr/csr.go +++ b/pkg/registration/register/csr/csr.go @@ -22,6 +22,8 @@ import ( "k8s.io/client-go/util/keyutil" "k8s.io/klog/v2" + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/ocm/pkg/registration/register" ) @@ -299,6 +301,10 @@ func (c *CSRDriver) IsHubKubeConfigValid(ctx context.Context, secretOption regis return isCertificateValid(logger, certData, nil) } +func (c *CSRDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster { + return cluster +} + func NewCSRDriver() register.RegisterDriver { return &CSRDriver{} } diff --git a/pkg/registration/register/interface.go b/pkg/registration/register/interface.go index 2ec3b09e6..00650909a 100644 --- a/pkg/registration/register/interface.go +++ b/pkg/registration/register/interface.go @@ -71,6 +71,9 @@ type RegisterDriver interface { // InformerHandler returns informer of the related object. If no object needs to be watched, the func could // return nil, nil. InformerHandler(option any) (cache.SharedIndexInformer, factory.EventFilterFunc) + + // ManagedClusterDecorator is to change managed cluster metadata or spec during registration process. + ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster } // Approvers is the inteface that each driver should implement on hub side. The hub controller will use this driver diff --git a/pkg/registration/register/secret_controller_test.go b/pkg/registration/register/secret_controller_test.go index 26a31aaf7..3a31fe5ab 100644 --- a/pkg/registration/register/secret_controller_test.go +++ b/pkg/registration/register/secret_controller_test.go @@ -17,6 +17,8 @@ import ( "k8s.io/client-go/tools/cache" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + clusterv1 "open-cluster-management.io/api/cluster/v1" + testingcommon "open-cluster-management.io/ocm/pkg/common/testing" testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing" ) @@ -133,7 +135,7 @@ func TestSync(t *testing.T) { for _, c := range testCases { t.Run(c.name, func(t *testing.T) { syncCtx := testingcommon.NewFakeSyncContext(t, "test") - kubeClient := kubefake.NewSimpleClientset(c.secrets...) + kubeClient := kubefake.NewClientset(c.secrets...) c.option.ManagementCoreClient = kubeClient.CoreV1() informerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute) c.option.ManagementSecretInformer = informerFactory.Core().V1().Secrets().Informer() @@ -195,3 +197,7 @@ func (f *fakeDriver) Process( func (f *fakeDriver) InformerHandler(_ any) (cache.SharedIndexInformer, factory.EventFilterFunc) { return nil, nil } + +func (f *fakeDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster { + return cluster +} diff --git a/pkg/registration/spoke/registration/creating_controller.go b/pkg/registration/spoke/registration/creating_controller.go index f37f3b175..9a6c6931d 100644 --- a/pkg/registration/spoke/registration/creating_controller.go +++ b/pkg/registration/spoke/registration/creating_controller.go @@ -23,28 +23,26 @@ var ( CreatingControllerSyncInterval = 60 * time.Minute ) +type ManagedClusterDecorator func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster + // managedClusterCreatingController creates a ManagedCluster on hub cluster during the spoke agent bootstrap phase type managedClusterCreatingController struct { - clusterName string - spokeExternalServerURLs []string - spokeCABundle []byte - clusterAnnotations map[string]string - hubClusterClient clientset.Interface + clusterName string + clusterDecorators []ManagedClusterDecorator + hubClusterClient clientset.Interface } // NewManagedClusterCreatingController creates a new managedClusterCreatingController on the managed cluster. func NewManagedClusterCreatingController( - clusterName string, spokeExternalServerURLs []string, annotations map[string]string, - spokeCABundle []byte, + clusterName string, + decorators []ManagedClusterDecorator, hubClusterClient clientset.Interface, recorder events.Recorder) factory.Controller { c := &managedClusterCreatingController{ - clusterName: clusterName, - spokeExternalServerURLs: spokeExternalServerURLs, - spokeCABundle: spokeCABundle, - clusterAnnotations: commonhelpers.FilterClusterAnnotations(annotations), - hubClusterClient: hubClusterClient, + clusterName: clusterName, + hubClusterClient: hubClusterClient, + clusterDecorators: decorators, } return factory.New(). @@ -69,20 +67,12 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac if errors.IsNotFound(err) { managedCluster := &clusterv1.ManagedCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: c.clusterName, - Annotations: c.clusterAnnotations, + Name: c.clusterName, }, } - if len(c.spokeExternalServerURLs) != 0 { - var managedClusterClientConfigs []clusterv1.ClientConfig - for _, serverURL := range c.spokeExternalServerURLs { - managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{ - URL: serverURL, - CABundle: c.spokeCABundle, - }) - } - managedCluster.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs + for _, decorator := range c.clusterDecorators { + managedCluster = decorator(managedCluster) } _, err = c.hubClusterClient.ClusterV1().ManagedClusters().Create(ctx, managedCluster, metav1.CreateOptions{}) @@ -94,37 +84,17 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac return nil } - // do not update ManagedClusterClientConfigs in ManagedCluster if spokeExternalServerURLs is empty - if len(c.spokeExternalServerURLs) == 0 { - return nil + managedCluster := existingCluster.DeepCopy() + for _, decorator := range c.clusterDecorators { + managedCluster = decorator(managedCluster) } - // merge ClientConfig - managedClusterClientConfigs := existingCluster.Spec.ManagedClusterClientConfigs - for _, serverURL := range c.spokeExternalServerURLs { - isIncludeByExisting := false - for _, existingClientConfig := range existingCluster.Spec.ManagedClusterClientConfigs { - if serverURL == existingClientConfig.URL { - isIncludeByExisting = true - break - } - } - - if !isIncludeByExisting { - managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{ - URL: serverURL, - CABundle: c.spokeCABundle, - }) - } - } - if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedClusterClientConfigs) { + if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedCluster.Spec.ManagedClusterClientConfigs) { return nil } // update ManagedClusterClientConfigs in ManagedCluster - clusterCopy := existingCluster.DeepCopy() - clusterCopy.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs - _, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, clusterCopy, metav1.UpdateOptions{}) + _, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{}) // ManagedClusterClientConfigs in ManagedCluster is only allowed updated during bootstrap. // After bootstrap secret expired, an unauthorized error will be got, skip it if skipUnauthorizedError(err) != nil { @@ -141,3 +111,40 @@ func skipUnauthorizedError(err error) error { return err } + +func AnnotationDecorator(annotations map[string]string) ManagedClusterDecorator { + return func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster { + filteredAnnotations := commonhelpers.FilterClusterAnnotations(annotations) + if cluster.Annotations == nil { + cluster.Annotations = make(map[string]string) + } + for key, value := range filteredAnnotations { + cluster.Annotations[key] = value + } + return cluster + } +} + +// ClientConfigDecorator merge ClientConfig +func ClientConfigDecorator(externalServerURLs []string, caBundle []byte) ManagedClusterDecorator { + return func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster { + for _, serverURL := range externalServerURLs { + isIncludeByExisting := false + for _, existingClientConfig := range cluster.Spec.ManagedClusterClientConfigs { + if serverURL == existingClientConfig.URL { + isIncludeByExisting = true + break + } + } + + if !isIncludeByExisting { + cluster.Spec.ManagedClusterClientConfigs = append( + cluster.Spec.ManagedClusterClientConfigs, clusterv1.ClientConfig{ + URL: serverURL, + CABundle: caBundle, + }) + } + } + return cluster + } +} diff --git a/pkg/registration/spoke/registration/creating_controller_test.go b/pkg/registration/spoke/registration/creating_controller_test.go index 7201e2485..3ed52abd8 100644 --- a/pkg/registration/spoke/registration/creating_controller_test.go +++ b/pkg/registration/spoke/registration/creating_controller_test.go @@ -59,13 +59,14 @@ func TestCreateSpokeCluster(t *testing.T) { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...) ctrl := managedClusterCreatingController{ - clusterName: testinghelpers.TestManagedClusterName, - spokeExternalServerURLs: []string{testSpokeExternalServerUrl}, - spokeCABundle: []byte("testcabundle"), - hubClusterClient: clusterClient, - clusterAnnotations: map[string]string{ - "agent.open-cluster-management.io/test": "true", + clusterName: testinghelpers.TestManagedClusterName, + clusterDecorators: []ManagedClusterDecorator{ + AnnotationDecorator(map[string]string{ + "agent.open-cluster-management.io/test": "true", + }), + ClientConfigDecorator([]string{testSpokeExternalServerUrl}, []byte("testcabundle")), }, + hubClusterClient: clusterClient, } syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, "")) diff --git a/pkg/registration/spoke/spokeagent.go b/pkg/registration/spoke/spokeagent.go index e16af4d19..38fa23578 100644 --- a/pkg/registration/spoke/spokeagent.go +++ b/pkg/registration/spoke/spokeagent.go @@ -254,8 +254,12 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context, // start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster spokeClusterCreatingController := registration.NewManagedClusterCreatingController( - o.agentOptions.SpokeClusterName, o.registrationOption.SpokeExternalServerURLs, o.registrationOption.ClusterAnnotations, - spokeClusterCABundle, + o.agentOptions.SpokeClusterName, + []registration.ManagedClusterDecorator{ + registration.AnnotationDecorator(o.registrationOption.ClusterAnnotations), + registration.ClientConfigDecorator(o.registrationOption.SpokeExternalServerURLs, spokeClusterCABundle), + o.driver.ManagedClusterDecorator, + }, bootstrapClusterClient, recorder, )