From 56a9b2f7fe422568cc694eef878147e653efe8c5 Mon Sep 17 00:00:00 2001 From: mprahl Date: Wed, 10 Apr 2024 10:04:33 -0400 Subject: [PATCH] Start and stop the Gatekeeper status sync controller dynamically Rather than rely on a health probe failing if the Gatekeeper installation status changes, just dynamically start and stop a new manager. Relates: https://issues.redhat.com/browse/ACM-10966 Signed-off-by: mprahl --- controllers/gatekeepersync/health.go | 97 ------ main.go | 440 ++++++++++++++++++++------- 2 files changed, 324 insertions(+), 213 deletions(-) delete mode 100644 controllers/gatekeepersync/health.go diff --git a/controllers/gatekeepersync/health.go b/controllers/gatekeepersync/health.go deleted file mode 100644 index ae4ca7b3..00000000 --- a/controllers/gatekeepersync/health.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Contributors to the Open Cluster Management project - -package gatekeepersync - -import ( - "context" - "errors" - "fmt" - "net/http" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - apiWatch "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/watch" - "sigs.k8s.io/controller-runtime/pkg/healthz" - - "open-cluster-management.io/governance-policy-framework-addon/controllers/utils" -) - -// GatekeeperInstallationChecker is a health checker for a health endpoint that fails if Gatekeeper's installation -// status changes or the passed in health checker functions fail. This is useful for Kubernetes to trigger a restart -// to either enable or disable the gatekeeper-constraint-status-sync controller based on the Gatekeeper installation -// status. -func GatekeeperInstallationChecker( - ctx context.Context, dynamicClient dynamic.Interface, checkers ...healthz.Checker, -) ( - healthz.Checker, bool, error, -) { - fieldSelector := fmt.Sprintf("metadata.name=constrainttemplates.%s", utils.GvkConstraintTemplate.Group) - timeout := int64(30) - crdGVR := schema.GroupVersionResource{ - Group: "apiextensions.k8s.io", - Version: "v1", - Resource: "customresourcedefinitions", - } - - listResult, err := dynamicClient.Resource(crdGVR).List( - ctx, metav1.ListOptions{FieldSelector: fieldSelector, TimeoutSeconds: &timeout}, - ) - if err != nil { - return nil, false, err - } - - gatekeeperInstalled := len(listResult.Items) > 0 - resourceVersion := listResult.GetResourceVersion() - - watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) { - options.FieldSelector = fieldSelector - - return dynamicClient.Resource(crdGVR).Watch(ctx, options) - } - - w, err := watch.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc}) - if err != nil { - return nil, false, err - } - - var lastHealthErr error - - return func(req *http.Request) error { - if lastHealthErr != nil { - return lastHealthErr - } - - select { - case <-w.Done(): - select { - case <-ctx.Done(): - // Stop the retry watcher if the parent context is canceled. - w.Stop() - - lastHealthErr = errors.New("the context is closed so the health check can no longer be performed") - default: - lastHealthErr = errors.New("the watch used by the Gatekeeper installation checker ended prematurely") - } - case result := <-w.ResultChan(): - if !gatekeeperInstalled && result.Type == apiWatch.Added { - lastHealthErr = errors.New("the controller needs to restart because Gatekeeper has been installed") - } - - if gatekeeperInstalled && result.Type == apiWatch.Deleted { - lastHealthErr = errors.New("the controller needs to restart because Gatekeeper has been uninstalled") - } - default: - for _, checker := range checkers { - err := checker(req) - if err != nil { - return err - } - } - } - - return lastHealthErr - }, gatekeeperInstalled, nil -} diff --git a/main.go b/main.go index 42802b42..2f2f706a 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + apiWatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -41,8 +43,10 @@ import ( corev1 "k8s.io/client-go/kubernetes/typed/core/v1" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" + apiCache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "open-cluster-management.io/addon-framework/pkg/lease" @@ -70,10 +74,12 @@ import ( ) var ( - eventsScheme = k8sruntime.NewScheme() - log = ctrl.Log.WithName("setup") - scheme = k8sruntime.NewScheme() - eventFilter fields.Selector + eventsScheme = k8sruntime.NewScheme() + log = ctrl.Log.WithName("setup") + scheme = k8sruntime.NewScheme() + eventFilter fields.Selector + healthAddresses = map[string]bool{} + healthAddressesLock = sync.RWMutex{} ) func printVersion() { @@ -275,12 +281,13 @@ func main() { os.Exit(1) } - healthAddresses := []string{mgrHealthAddr} + healthAddressesLock.RLock() + healthAddresses[mgrHealthAddr] = true mainCtx := ctrl.SetupSignalHandler() mgrCtx, mgrCtxCancel := context.WithCancel(mainCtx) - mgr := getManager(mgrCtx, mgrOptionsBase, mgrHealthAddr, hubCfg, managedCfg) + mgr := getManager(mgrOptionsBase, mgrHealthAddr, hubCfg, managedCfg) var hubMgr manager.Manager @@ -291,11 +298,38 @@ func main() { os.Exit(1) } - healthAddresses = append(healthAddresses, hubMgrHealthAddr) + healthAddresses[hubMgrHealthAddr] = true hubMgr = getHubManager(mgrOptionsBase, hubMgrHealthAddr, hubCfg, managedCfg) } + healthAddressesLock.RUnlock() + + // Add Gatekeeper controller if enabled + if !tool.Options.DisableGkSync { + discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(managedCfg) + + serverVersion, err := discoveryClient.ServerVersion() + if err != nil { + log.Error(err, "unable to detect the managed cluster's Kubernetes version") + os.Exit(1) + } + + // Gatekeeper is not supported on such old versions of Kubernetes, so just always disable it in this case. In + // particular, CRD v1 is not available before Kubernetes v1.16.0. + if semver.Compare(serverVersion.GitVersion, "v1.16.0") >= 0 { + dynamicClient := dynamic.NewForConfigOrDie(managedCfg) + + err = gatekeeperInstallationChecker(mgrCtx, managedCfg, dynamicClient, mgrOptionsBase) + if err != nil { + log.Error(err, "unable to determine if Gatekeeper is installed") + os.Exit(1) + } + } + } else { + log.Info("The Gatekeeper integration is set to disabled") + } + log.Info("Adding controllers to managers") var queue workqueue.RateLimitingInterface @@ -316,7 +350,7 @@ func main() { wg.Add(1) go func() { - err := startHealthProxy(mgrCtx, &wg, healthAddresses...) + err := startHealthProxy(mgrCtx, &wg) if err != nil { log.Error(err, "failed to start the health endpoint proxy") @@ -418,7 +452,7 @@ func main() { // getManager return a controller Manager object that watches on the managed cluster and has the controllers registered. func getManager( - mgrCtx context.Context, options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config, + options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config, ) manager.Manager { crdLabelSelector := labels.SelectorFromSet(map[string]string{utils.PolicyTypeLabel: "template"}) @@ -432,9 +466,6 @@ func getManager( &extensionsv1beta1.CustomResourceDefinition{}: { Label: crdLabelSelector, }, - &admissionregistration.ValidatingWebhookConfiguration{}: { - Field: fields.SelectorFromSet(fields.Set{"metadata.name": gatekeepersync.GatekeeperWebhookName}), - }, &v1.Event{}: { Namespaces: map[string]cache.Config{ tool.Options.ClusterNamespace: { @@ -510,17 +541,8 @@ func getManager( os.Exit(1) } - healthCheck := configChecker.Check - - // Add Gatekeeper controller if enabled - if !tool.Options.DisableGkSync { - healthCheck = addGkControllerToManager(mgrCtx, mgr, managedCfg, configChecker.Check) - } else { - log.Info("The Gatekeeper integration is set to disabled") - } - //+kubebuilder:scaffold:builder - if err := mgr.AddHealthzCheck("healthz", healthCheck); err != nil { + if err := mgr.AddHealthzCheck("healthz", configChecker.Check); err != nil { log.Error(err, "unable to set up health check") os.Exit(1) } @@ -596,114 +618,41 @@ func getHubManager( return mgr } -// addGkControllerToManager will configure the input manager with the gatekeeper-constraint-status-sync controller if -// Gatekeeper is installed and the Kubernetes cluster is v1.16.0+. The returned health checker will wrap the input -// health checkers but also return unhealthy if the Gatekeeper installation status chanages. -func addGkControllerToManager( - mgrCtx context.Context, mgr manager.Manager, managedCfg *rest.Config, healthCheck healthz.Checker, -) healthz.Checker { - discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(managedCfg) - - serverVersion, err := discoveryClient.ServerVersion() - if err != nil { - log.Error(err, "unable to detect the managed cluster's Kubernetes version") - os.Exit(1) - } - - // Gatekeeper is not supported on such old versions of Kubernetes, so just always disable it in this case. In - // particular, CRD v1 is not available before Kubernetes v1.16.0. - if semver.Compare(serverVersion.GitVersion, "v1.16.0") < 0 { - return healthCheck - } - - dynamicClient := dynamic.NewForConfigOrDie(managedCfg) - - gkHealthCheck, gatekeeperInstalled, err := gatekeepersync.GatekeeperInstallationChecker( - mgrCtx, dynamicClient, healthCheck, - ) - if err != nil { - log.Error(err, "unable to determine if Gatekeeper is installed") - os.Exit(1) - } - - // Only run the controller if Gatekeeper is installed - if !gatekeeperInstalled { - log.Info( - "Gatekeeper is not installed so the gatekeepersync controller will be disabled. If running in a " + - "cluster, the health endpoint will become unhealthy if Gatekeeper is installed to trigger a " + - "restart.", - ) - - return gkHealthCheck - } - - log.Info( - "Starting the controller since a Gatekeeper installation was detected", - "controller", gatekeepersync.ControllerName, - ) - - clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig()) - instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok - - constraintsReconciler, constraintEvents := depclient.NewControllerRuntimeSource() - - constraintsWatcher, err := depclient.New(managedCfg, constraintsReconciler, nil) - if err != nil { - log.Error(err, "Unable to create constraints watcher") - os.Exit(1) - } - - go func() { - err := constraintsWatcher.Start(mgrCtx) - if err != nil { - panic(err) - } - }() - - // Wait until the constraints watcher has started. - <-constraintsWatcher.Started() - - if err = (&gatekeepersync.GatekeeperConstraintReconciler{ - Client: mgr.GetClient(), - ComplianceEventSender: utils.ComplianceEventSender{ - ClusterNamespace: tool.Options.ClusterNamespace, - ClientSet: clientset, - ControllerName: gatekeepersync.ControllerName, - InstanceName: instanceName, - }, - DynamicClient: dynamicClient, - ConstraintsWatcher: constraintsWatcher, - Scheme: mgr.GetScheme(), - ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), - }).SetupWithManager(mgr, constraintEvents); err != nil { - log.Error(err, "unable to create controller", "controller", gatekeepersync.ControllerName) - os.Exit(1) - } - - return gkHealthCheck -} - -// startHealthProxy responds to /healthz and /readyz HTTP requests and combines the status together of the input -// addresses representing the managers. The HTTP server gracefully shutsdown when the input context is closed. +// startHealthProxy responds to /healthz and /readyz HTTP requests and combines the status together of the +// healthAddresses map representing the managers. The HTTP server gracefully shutsdown when the input context is closed. // The wg.Done() is only called after the HTTP server fails to start or after graceful shutdown of the HTTP server. -func startHealthProxy(ctx context.Context, wg *sync.WaitGroup, addresses ...string) error { +func startHealthProxy(ctx context.Context, wg *sync.WaitGroup) error { log := ctrl.Log.WithName("healthproxy") + httpClient := http.Client{ + Timeout: 5 * time.Second, + } + for _, endpoint := range []string{"/healthz", "/readyz"} { endpoint := endpoint http.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) { + healthAddressesLock.RLock() + addresses := make([]string, 0, len(healthAddresses)) + + // Populate a separate slice to avoid holding the lock too long. + for address := range healthAddresses { + addresses = append(addresses, address) + } + + healthAddressesLock.RUnlock() + for _, address := range addresses { - req, err := http.NewRequestWithContext( - ctx, http.MethodGet, fmt.Sprintf("http://%s%s", address, endpoint), nil, - ) + healthURL := fmt.Sprintf("http://%s%s", address, endpoint) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthURL, nil) if err != nil { http.Error(w, fmt.Sprintf("manager: %s", err.Error()), http.StatusInternalServerError) return } - resp, err := http.DefaultClient.Do(req) + resp, err := httpClient.Do(req) if err != nil { http.Error(w, fmt.Sprintf("manager: %s", err.Error()), http.StatusInternalServerError) @@ -951,3 +900,262 @@ func addControllers( os.Exit(1) } } + +// gatekeeperInstallationChecker starts or stops the gatekeeper-constraint-status-sync controller if Gatekeeper's +// installation status changes. The controller must be off when Gatekeeper is not installed. +func gatekeeperInstallationChecker( + ctx context.Context, managedCfg *rest.Config, dynamicClient dynamic.Interface, mgrOptions manager.Options, +) error { + fieldSelector := fmt.Sprintf("metadata.name=constrainttemplates.%s", utils.GvkConstraintTemplate.Group) + timeout := int64(30) + crdGVR := schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + } + + newRetryWatcher := func() (*watch.RetryWatcher, bool, error) { + listResult, err := dynamicClient.Resource(crdGVR).List( + ctx, metav1.ListOptions{FieldSelector: fieldSelector, TimeoutSeconds: &timeout}, + ) + if err != nil { + return nil, false, err + } + + gatekeeperInstalled := len(listResult.Items) > 0 + resourceVersion := listResult.GetResourceVersion() + + watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) { + options.FieldSelector = fieldSelector + + return dynamicClient.Resource(crdGVR).Watch(ctx, options) + } + + w, err := watch.NewRetryWatcher(resourceVersion, &apiCache.ListWatch{WatchFunc: watchFunc}) + + return w, gatekeeperInstalled, err + } + + healthAddress, err := getFreeLocalAddr() + if err != nil { + return err + } + + // Disable the metrics endpoint for this manager. Note that since they both use the global + // metrics registry, metrics for this manager are still exposed by the other manager. + mgrOptions.Metrics.BindAddress = "0" + mgrOptions.LeaderElectionID = "governance-policy-framework-addon3.open-cluster-management.io" + mgrOptions.HealthProbeBindAddress = healthAddress + mgrOptions.Cache = cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &admissionregistration.ValidatingWebhookConfiguration{}: { + Field: fields.SelectorFromSet(fields.Set{"metadata.name": gatekeepersync.GatekeeperWebhookName}), + }, + }, + DefaultNamespaces: map[string]cache.Config{ + tool.Options.ClusterNamespace: {}, + }, + } + + startMgr := func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + // When ctx is cancelled, then return. + return + default: + // When ctx is still open, then start the manager. + mgr, err := ctrl.NewManager(managedCfg, mgrOptions) + if err != nil { + log.Error(err, "Unable to start the Gatekeeper constraint status sync manager. Will retry.") + + break + } + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + log.Error( + err, + "Unable to set up health check on the Gatekeeper constraint status sync manager. Will retry.", + ) + + break + } + + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + log.Error( + err, + "Unable to set up ready check on the Gatekeeper constraint status sync manager. Will retry.", + ) + + break + } + + log.Info( + "Starting the controller since a Gatekeeper installation was detected", + "controller", gatekeepersync.ControllerName, + ) + + clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig()) + instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok + + constraintsReconciler, constraintEvents := depclient.NewControllerRuntimeSource() + + constraintsWatcher, err := depclient.New(managedCfg, constraintsReconciler, nil) + if err != nil { + log.Error(err, "Unable to create the constraints watcher. Will retry.") + + break + } + + // Create a separate context for the dynamic watcher to be able to cancel that separately in case + // starting the manager needs to be retried. + dynamicWatcherCtx, dynamicWatchCancel := context.WithCancel(ctx) + defer dynamicWatchCancel() + + go func() { + err := constraintsWatcher.Start(dynamicWatcherCtx) + if err != nil { + panic(err) + } + }() + + // Wait until the constraints watcher has started. + <-constraintsWatcher.Started() + + if err = (&gatekeepersync.GatekeeperConstraintReconciler{ + Client: mgr.GetClient(), + ComplianceEventSender: utils.ComplianceEventSender{ + ClusterNamespace: tool.Options.ClusterNamespace, + ClientSet: clientset, + ControllerName: gatekeepersync.ControllerName, + InstanceName: instanceName, + }, + DynamicClient: dynamicClient, + ConstraintsWatcher: constraintsWatcher, + Scheme: mgr.GetScheme(), + ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), + }).SetupWithManager(mgr, constraintEvents); err != nil { + log.Error( + err, "Unable to create controller. Will retry.", "controller", gatekeepersync.ControllerName, + ) + + // Stop the dynamic watcher since the manager will get recreated. + dynamicWatchCancel() + + break + } + + // Add the health bind address to be considered by the health proxy. + healthAddressesLock.Lock() + healthAddresses[healthAddress] = true + healthAddressesLock.Unlock() + + log.Info("Gatekeeper is installed. Starting the gatekeeper-constraint-status-sync controller.") + + // This blocks until the manager stops. + err = mgr.Start(ctx) + + // Remove the health bind address after the manager stops. + healthAddressesLock.Lock() + delete(healthAddresses, healthAddress) + healthAddressesLock.Unlock() + + if err != nil { + log.Error(err, "Unable to start the Gatekeeper constraint status sync manager. Will retry.") + + // Stop the dynamic watcher since the manager will get recreated. + dynamicWatchCancel() + + break + } + + return + } + + time.Sleep(time.Second) + } + } + + w, gatekeeperInstalled, err := newRetryWatcher() + if err != nil { + return err + } + + var mgrCtx context.Context + var mgrCtxCancel context.CancelFunc + + mgrRunning := false + + if gatekeeperInstalled { + mgrRunning = true + + mgrCtx, mgrCtxCancel = context.WithCancel(ctx) + + go startMgr(mgrCtx) + } + + go func() { + for { + select { + case <-w.Done(): + select { + case <-ctx.Done(): + // Stop the retry watcher if the parent context is canceled. + w.Stop() + + return + default: + newWatcher, gatekeeperInstalled, err := newRetryWatcher() + if err != nil { + log.Error( + err, + "Failed to restart the CRD watcher to monitor the Gatekeeper installation. Will retry.", + ) + + time.Sleep(time.Second) + + continue + } + + w = newWatcher + + // Handle missing events during the watcher downtime. + if gatekeeperInstalled { + if !mgrRunning { + mgrRunning = true + + mgrCtx, mgrCtxCancel = context.WithCancel(ctx) + + go startMgr(mgrCtx) + } + } else if mgrRunning && mgrCtxCancel != nil { + mgrCtxCancel() + } + } + case result := <-w.ResultChan(): + // If the CRD is added, then Gatekeeper is installed. + if result.Type == apiWatch.Added { + if mgrRunning { + continue + } + + mgrRunning = true + + mgrCtx, mgrCtxCancel = context.WithCancel(ctx) + + go startMgr(mgrCtx) + } else if result.Type == apiWatch.Deleted { + log.Info("Gatekeeper was uninstalled. Stopping the gatekeeper-constraint-status-sync controller.") + + if mgrCtxCancel != nil { + mgrCtxCancel() + } + + mgrRunning = false + } + } + } + }() + + return nil +}