From 049994499251e22ff843e1d14654daf281ddc900 Mon Sep 17 00:00:00 2001 From: Justin Kulikauskas Date: Wed, 17 May 2023 10:53:04 -0400 Subject: [PATCH] Update API discovery cache even when incomplete Previously, if there were any APIs having issues on the cluster, no new API information could be used by the controller. Now, partial results can still be updated. Additionally, API discovery errors during startup will not prevent the controller from functioning at all. Refs: - https://issues.redhat.com/browse/ACM-5491 Signed-off-by: Justin Kulikauskas --- controllers/configurationpolicy_controller.go | 97 ++++++++++--------- test/e2e/case18_discovery_refresh_test.go | 84 +++++++++++----- .../bad-apiservice.yaml | 13 +++ 3 files changed, 126 insertions(+), 68 deletions(-) create mode 100644 test/resources/case18_discovery_refresh/bad-apiservice.yaml diff --git a/controllers/configurationpolicy_controller.go b/controllers/configurationpolicy_controller.go index 32b2a628..cd068020 100644 --- a/controllers/configurationpolicy_controller.go +++ b/controllers/configurationpolicy_controller.go @@ -178,13 +178,16 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies( } start := time.Now() - policiesList := policyv1.ConfigurationPolicyList{} var skipLoop bool - var discoveryErr error if len(r.apiResourceList) == 0 || len(r.apiGroups) == 0 { - discoveryErr = r.refreshDiscoveryInfo() + discoveryErr := r.refreshDiscoveryInfo() + + // If there was an error and no API information was received, then skip the loop. + if discoveryErr != nil && (len(r.apiResourceList) == 0 || len(r.apiGroups) == 0) { + skipLoop = true + } } // If it's been more than 10 minutes since the last refresh, then refresh the discovery info, but ignore @@ -202,49 +205,45 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies( skipLoop = true } - if !skipLoop && (discoveryErr == nil || cleanupImmediately) { + if cleanupImmediately || !skipLoop { + policiesList := policyv1.ConfigurationPolicyList{} + // This retrieves the policies from the controller-runtime cache populated by the watch. err := r.List(context.TODO(), &policiesList) if err != nil { log.Error(err, "Failed to list the ConfigurationPolicy objects to evaluate") + } else { + // This is done every loop cycle since the channel needs to be variable in size to + // account for the number of policies changing. + policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items)) + var wg sync.WaitGroup - skipLoop = true - } - } else { - skipLoop = true - } - - // This is done every loop cycle since the channel needs to be variable in size to account for the number of - // policies changing. - policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items)) - var wg sync.WaitGroup + log.Info("Processing the policies", "count", len(policiesList.Items)) - if !skipLoop { - log.Info("Processing the policies", "count", len(policiesList.Items)) + // Initialize the related object map + policyRelatedObjectMap = sync.Map{} - // Initialize the related object map - policyRelatedObjectMap = sync.Map{} + for i := 0; i < int(r.EvaluationConcurrency); i++ { + wg.Add(1) - for i := 0; i < int(r.EvaluationConcurrency); i++ { - wg.Add(1) + go r.handlePolicyWorker(policyQueue, &wg) + } - go r.handlePolicyWorker(policyQueue, &wg) - } + for i := range policiesList.Items { + policy := policiesList.Items[i] + if !shouldEvaluatePolicy(&policy, cleanupImmediately) { + continue + } - for i := range policiesList.Items { - policy := policiesList.Items[i] - if !shouldEvaluatePolicy(&policy, cleanupImmediately) { - continue + // handle each template in each policy + policyQueue <- &policy } - // handle each template in each policy - policyQueue <- &policy + close(policyQueue) + wg.Wait() } } - close(policyQueue) - wg.Wait() - // Update the related object metric after policy processing if r.EnableMetrics { updateRelatedObjectMetric() @@ -288,6 +287,9 @@ func (r *ConfigurationPolicyReconciler) handlePolicyWorker( } } +// refreshDiscoveryInfo tries to discover all the available APIs on the cluster, and update the +// cached information. If it encounters an error, it may update the cache with partial results, +// if those seem "better" than what's in the current cache. func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error { log.V(2).Info("Refreshing the discovery info") r.lock.Lock() @@ -295,28 +297,35 @@ func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error { dd := r.TargetK8sClient.Discovery() - _, apiResourceList, err := dd.ServerGroupsAndResources() - if err != nil { - log.Error(err, "Could not get the API resource list") - - return err + _, apiResourceList, resourceErr := dd.ServerGroupsAndResources() + if resourceErr != nil { + log.Error(resourceErr, "Could not get the full API resource list") } - r.apiResourceList = apiResourceList + if resourceErr == nil || (len(apiResourceList) > len(r.discoveryInfo.apiResourceList)) { + // update the list if it's complete, or if it's "better" than the old one + r.discoveryInfo.apiResourceList = apiResourceList + } - apiGroups, err := restmapper.GetAPIGroupResources(dd) - if err != nil { - log.Error(err, "Could not get the API groups list") + apiGroups, groupErr := restmapper.GetAPIGroupResources(dd) + if groupErr != nil { + log.Error(groupErr, "Could not get the full API groups list") + } - return err + if (resourceErr == nil && groupErr == nil) || (len(apiGroups) > len(r.discoveryInfo.apiGroups)) { + // update the list if it's complete, or if it's "better" than the old one + r.discoveryInfo.apiGroups = apiGroups } - r.apiGroups = apiGroups // Reset the OpenAPI cache in case the CRDs were updated since the last fetch. r.openAPIParser = openapi.NewOpenAPIParser(dd) - r.discoveryLastRefreshed = time.Now().UTC() + r.discoveryInfo.discoveryLastRefreshed = time.Now().UTC() - return nil + if resourceErr != nil { + return resourceErr + } + + return groupErr // can be nil } // shouldEvaluatePolicy will determine if the policy is ready for evaluation by examining the diff --git a/test/e2e/case18_discovery_refresh_test.go b/test/e2e/case18_discovery_refresh_test.go index 7b80c043..1336e797 100644 --- a/test/e2e/case18_discovery_refresh_test.go +++ b/test/e2e/case18_discovery_refresh_test.go @@ -11,39 +11,41 @@ import ( . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "open-cluster-management.io/config-policy-controller/test/utils" ) -const ( - case18PolicyName = "policy-c18" - case18Policy = "../resources/case18_discovery_refresh/policy.yaml" - case18PolicyTemplateName = "policy-c18-template" - case18PolicyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml" - case18PolicyTemplate = "../resources/case18_discovery_refresh/policy-template.yaml" - case18ConfigMapName = "c18-configmap" -) +var _ = Describe("Test discovery info refresh", Ordered, func() { + const ( + policyName = "policy-c18" + policyYaml = "../resources/case18_discovery_refresh/policy.yaml" + policyTemplateName = "policy-c18-template" + policyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml" + policyTemplateYaml = "../resources/case18_discovery_refresh/policy-template.yaml" + configMapName = "c18-configmap" + badAPIServiceYaml = "../resources/case18_discovery_refresh/bad-apiservice.yaml" + ) -var _ = Describe("Test discovery info refresh", func() { It("Verifies that the discovery info is refreshed after a CRD is installed", func() { - By("Creating " + case18PolicyName + " on managed") - utils.Kubectl("apply", "-f", case18Policy, "-n", testNamespace) + By("Creating " + policyName + " on managed") + utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace) policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyName, + policyName, testNamespace, true, defaultTimeoutSeconds, ) Expect(policy).NotTo(BeNil()) - By("Verifying " + case18PolicyName + " becomes compliant") + By("Verifying " + policyName + " becomes compliant") Eventually(func() interface{} { policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyName, + policyName, testNamespace, true, defaultTimeoutSeconds, @@ -54,12 +56,33 @@ var _ = Describe("Test discovery info refresh", func() { }) It("Verifies that the discovery info is refreshed when a template references a new object kind", func() { + By("Adding a non-functional API service") + utils.Kubectl("apply", "-f", badAPIServiceYaml) + + By("Checking that the API causes an error during API discovery") + Eventually( + func() error { + cmd := exec.Command("kubectl", "api-resources") + + err := cmd.Start() + if err != nil { + return nil // Just retry. If this consistently has an error, the test should fail. + } + + err = cmd.Wait() + + return err + }, + defaultTimeoutSeconds, + 1, + ).ShouldNot(BeNil()) + By("Creating the prerequisites on managed") // This needs to be wrapped in an eventually since the object can't be created immediately after the CRD // is created. Eventually( func() interface{} { - cmd := exec.Command("kubectl", "apply", "-f", case18PolicyTemplatePreReqs) + cmd := exec.Command("kubectl", "apply", "-f", policyTemplatePreReqs) err := cmd.Start() if err != nil { @@ -74,24 +97,24 @@ var _ = Describe("Test discovery info refresh", func() { 1, ).Should(BeNil()) - By("Creating " + case18PolicyTemplateName + " on managed") - utils.Kubectl("apply", "-f", case18PolicyTemplate, "-n", testNamespace) + By("Creating " + policyTemplateName + " on managed") + utils.Kubectl("apply", "-f", policyTemplateYaml, "-n", testNamespace) policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyTemplateName, + policyTemplateName, testNamespace, true, defaultTimeoutSeconds, ) Expect(policy).NotTo(BeNil()) - By("Verifying " + case18PolicyTemplateName + " becomes compliant") + By("Verifying " + policyTemplateName + " becomes compliant") Eventually(func() interface{} { policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyTemplateName, + policyTemplateName, testNamespace, true, defaultTimeoutSeconds, @@ -101,9 +124,9 @@ var _ = Describe("Test discovery info refresh", func() { }, defaultTimeoutSeconds, 1).Should(Equal("Compliant")) }) - It("Cleans up", func() { + AfterAll(func() { err := clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete( - context.TODO(), case18PolicyName, metav1.DeleteOptions{}, + context.TODO(), policyName, metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) @@ -117,7 +140,7 @@ var _ = Describe("Test discovery info refresh", func() { } err = clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete( - context.TODO(), case18PolicyTemplateName, metav1.DeleteOptions{}, + context.TODO(), policyTemplateName, metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) @@ -131,7 +154,20 @@ var _ = Describe("Test discovery info refresh", func() { } err = clientManaged.CoreV1().ConfigMaps("default").Delete( - context.TODO(), case18ConfigMapName, metav1.DeleteOptions{}, + context.TODO(), configMapName, metav1.DeleteOptions{}, + ) + if !k8serrors.IsNotFound(err) { + Expect(err).ToNot(HaveOccurred()) + } + + gvrAPIService := schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + } + + err = clientManagedDynamic.Resource(gvrAPIService).Delete( + context.TODO(), "v1beta1.pizza.example.com", metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) diff --git a/test/resources/case18_discovery_refresh/bad-apiservice.yaml b/test/resources/case18_discovery_refresh/bad-apiservice.yaml new file mode 100644 index 00000000..a8a4a3cf --- /dev/null +++ b/test/resources/case18_discovery_refresh/bad-apiservice.yaml @@ -0,0 +1,13 @@ +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta1.pizza.example.com +spec: + group: pizza.example.com + groupPriorityMinimum: 100 + insecureSkipTLSVerify: true + service: + name: pizza-server + namespace: kube-system + version: v1beta1 + versionPriority: 100