diff --git a/controllers/configurationpolicy_controller.go b/controllers/configurationpolicy_controller.go index 32b2a628..cd068020 100644 --- a/controllers/configurationpolicy_controller.go +++ b/controllers/configurationpolicy_controller.go @@ -178,13 +178,16 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies( } start := time.Now() - policiesList := policyv1.ConfigurationPolicyList{} var skipLoop bool - var discoveryErr error if len(r.apiResourceList) == 0 || len(r.apiGroups) == 0 { - discoveryErr = r.refreshDiscoveryInfo() + discoveryErr := r.refreshDiscoveryInfo() + + // If there was an error and no API information was received, then skip the loop. + if discoveryErr != nil && (len(r.apiResourceList) == 0 || len(r.apiGroups) == 0) { + skipLoop = true + } } // If it's been more than 10 minutes since the last refresh, then refresh the discovery info, but ignore @@ -202,49 +205,45 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies( skipLoop = true } - if !skipLoop && (discoveryErr == nil || cleanupImmediately) { + if cleanupImmediately || !skipLoop { + policiesList := policyv1.ConfigurationPolicyList{} + // This retrieves the policies from the controller-runtime cache populated by the watch. err := r.List(context.TODO(), &policiesList) if err != nil { log.Error(err, "Failed to list the ConfigurationPolicy objects to evaluate") + } else { + // This is done every loop cycle since the channel needs to be variable in size to + // account for the number of policies changing. + policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items)) + var wg sync.WaitGroup - skipLoop = true - } - } else { - skipLoop = true - } - - // This is done every loop cycle since the channel needs to be variable in size to account for the number of - // policies changing. - policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items)) - var wg sync.WaitGroup + log.Info("Processing the policies", "count", len(policiesList.Items)) - if !skipLoop { - log.Info("Processing the policies", "count", len(policiesList.Items)) + // Initialize the related object map + policyRelatedObjectMap = sync.Map{} - // Initialize the related object map - policyRelatedObjectMap = sync.Map{} + for i := 0; i < int(r.EvaluationConcurrency); i++ { + wg.Add(1) - for i := 0; i < int(r.EvaluationConcurrency); i++ { - wg.Add(1) + go r.handlePolicyWorker(policyQueue, &wg) + } - go r.handlePolicyWorker(policyQueue, &wg) - } + for i := range policiesList.Items { + policy := policiesList.Items[i] + if !shouldEvaluatePolicy(&policy, cleanupImmediately) { + continue + } - for i := range policiesList.Items { - policy := policiesList.Items[i] - if !shouldEvaluatePolicy(&policy, cleanupImmediately) { - continue + // handle each template in each policy + policyQueue <- &policy } - // handle each template in each policy - policyQueue <- &policy + close(policyQueue) + wg.Wait() } } - close(policyQueue) - wg.Wait() - // Update the related object metric after policy processing if r.EnableMetrics { updateRelatedObjectMetric() @@ -288,6 +287,9 @@ func (r *ConfigurationPolicyReconciler) handlePolicyWorker( } } +// refreshDiscoveryInfo tries to discover all the available APIs on the cluster, and update the +// cached information. If it encounters an error, it may update the cache with partial results, +// if those seem "better" than what's in the current cache. func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error { log.V(2).Info("Refreshing the discovery info") r.lock.Lock() @@ -295,28 +297,35 @@ func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error { dd := r.TargetK8sClient.Discovery() - _, apiResourceList, err := dd.ServerGroupsAndResources() - if err != nil { - log.Error(err, "Could not get the API resource list") - - return err + _, apiResourceList, resourceErr := dd.ServerGroupsAndResources() + if resourceErr != nil { + log.Error(resourceErr, "Could not get the full API resource list") } - r.apiResourceList = apiResourceList + if resourceErr == nil || (len(apiResourceList) > len(r.discoveryInfo.apiResourceList)) { + // update the list if it's complete, or if it's "better" than the old one + r.discoveryInfo.apiResourceList = apiResourceList + } - apiGroups, err := restmapper.GetAPIGroupResources(dd) - if err != nil { - log.Error(err, "Could not get the API groups list") + apiGroups, groupErr := restmapper.GetAPIGroupResources(dd) + if groupErr != nil { + log.Error(groupErr, "Could not get the full API groups list") + } - return err + if (resourceErr == nil && groupErr == nil) || (len(apiGroups) > len(r.discoveryInfo.apiGroups)) { + // update the list if it's complete, or if it's "better" than the old one + r.discoveryInfo.apiGroups = apiGroups } - r.apiGroups = apiGroups // Reset the OpenAPI cache in case the CRDs were updated since the last fetch. r.openAPIParser = openapi.NewOpenAPIParser(dd) - r.discoveryLastRefreshed = time.Now().UTC() + r.discoveryInfo.discoveryLastRefreshed = time.Now().UTC() - return nil + if resourceErr != nil { + return resourceErr + } + + return groupErr // can be nil } // shouldEvaluatePolicy will determine if the policy is ready for evaluation by examining the diff --git a/test/e2e/case18_discovery_refresh_test.go b/test/e2e/case18_discovery_refresh_test.go index 7b80c043..1336e797 100644 --- a/test/e2e/case18_discovery_refresh_test.go +++ b/test/e2e/case18_discovery_refresh_test.go @@ -11,39 +11,41 @@ import ( . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "open-cluster-management.io/config-policy-controller/test/utils" ) -const ( - case18PolicyName = "policy-c18" - case18Policy = "../resources/case18_discovery_refresh/policy.yaml" - case18PolicyTemplateName = "policy-c18-template" - case18PolicyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml" - case18PolicyTemplate = "../resources/case18_discovery_refresh/policy-template.yaml" - case18ConfigMapName = "c18-configmap" -) +var _ = Describe("Test discovery info refresh", Ordered, func() { + const ( + policyName = "policy-c18" + policyYaml = "../resources/case18_discovery_refresh/policy.yaml" + policyTemplateName = "policy-c18-template" + policyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml" + policyTemplateYaml = "../resources/case18_discovery_refresh/policy-template.yaml" + configMapName = "c18-configmap" + badAPIServiceYaml = "../resources/case18_discovery_refresh/bad-apiservice.yaml" + ) -var _ = Describe("Test discovery info refresh", func() { It("Verifies that the discovery info is refreshed after a CRD is installed", func() { - By("Creating " + case18PolicyName + " on managed") - utils.Kubectl("apply", "-f", case18Policy, "-n", testNamespace) + By("Creating " + policyName + " on managed") + utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace) policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyName, + policyName, testNamespace, true, defaultTimeoutSeconds, ) Expect(policy).NotTo(BeNil()) - By("Verifying " + case18PolicyName + " becomes compliant") + By("Verifying " + policyName + " becomes compliant") Eventually(func() interface{} { policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyName, + policyName, testNamespace, true, defaultTimeoutSeconds, @@ -54,12 +56,33 @@ var _ = Describe("Test discovery info refresh", func() { }) It("Verifies that the discovery info is refreshed when a template references a new object kind", func() { + By("Adding a non-functional API service") + utils.Kubectl("apply", "-f", badAPIServiceYaml) + + By("Checking that the API causes an error during API discovery") + Eventually( + func() error { + cmd := exec.Command("kubectl", "api-resources") + + err := cmd.Start() + if err != nil { + return nil // Just retry. If this consistently has an error, the test should fail. + } + + err = cmd.Wait() + + return err + }, + defaultTimeoutSeconds, + 1, + ).ShouldNot(BeNil()) + By("Creating the prerequisites on managed") // This needs to be wrapped in an eventually since the object can't be created immediately after the CRD // is created. Eventually( func() interface{} { - cmd := exec.Command("kubectl", "apply", "-f", case18PolicyTemplatePreReqs) + cmd := exec.Command("kubectl", "apply", "-f", policyTemplatePreReqs) err := cmd.Start() if err != nil { @@ -74,24 +97,24 @@ var _ = Describe("Test discovery info refresh", func() { 1, ).Should(BeNil()) - By("Creating " + case18PolicyTemplateName + " on managed") - utils.Kubectl("apply", "-f", case18PolicyTemplate, "-n", testNamespace) + By("Creating " + policyTemplateName + " on managed") + utils.Kubectl("apply", "-f", policyTemplateYaml, "-n", testNamespace) policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyTemplateName, + policyTemplateName, testNamespace, true, defaultTimeoutSeconds, ) Expect(policy).NotTo(BeNil()) - By("Verifying " + case18PolicyTemplateName + " becomes compliant") + By("Verifying " + policyTemplateName + " becomes compliant") Eventually(func() interface{} { policy := utils.GetWithTimeout( clientManagedDynamic, gvrConfigPolicy, - case18PolicyTemplateName, + policyTemplateName, testNamespace, true, defaultTimeoutSeconds, @@ -101,9 +124,9 @@ var _ = Describe("Test discovery info refresh", func() { }, defaultTimeoutSeconds, 1).Should(Equal("Compliant")) }) - It("Cleans up", func() { + AfterAll(func() { err := clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete( - context.TODO(), case18PolicyName, metav1.DeleteOptions{}, + context.TODO(), policyName, metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) @@ -117,7 +140,7 @@ var _ = Describe("Test discovery info refresh", func() { } err = clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete( - context.TODO(), case18PolicyTemplateName, metav1.DeleteOptions{}, + context.TODO(), policyTemplateName, metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) @@ -131,7 +154,20 @@ var _ = Describe("Test discovery info refresh", func() { } err = clientManaged.CoreV1().ConfigMaps("default").Delete( - context.TODO(), case18ConfigMapName, metav1.DeleteOptions{}, + context.TODO(), configMapName, metav1.DeleteOptions{}, + ) + if !k8serrors.IsNotFound(err) { + Expect(err).ToNot(HaveOccurred()) + } + + gvrAPIService := schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + } + + err = clientManagedDynamic.Resource(gvrAPIService).Delete( + context.TODO(), "v1beta1.pizza.example.com", metav1.DeleteOptions{}, ) if !k8serrors.IsNotFound(err) { Expect(err).ToNot(HaveOccurred()) diff --git a/test/resources/case18_discovery_refresh/bad-apiservice.yaml b/test/resources/case18_discovery_refresh/bad-apiservice.yaml new file mode 100644 index 00000000..a8a4a3cf --- /dev/null +++ b/test/resources/case18_discovery_refresh/bad-apiservice.yaml @@ -0,0 +1,13 @@ +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta1.pizza.example.com +spec: + group: pizza.example.com + groupPriorityMinimum: 100 + insecureSkipTLSVerify: true + service: + name: pizza-server + namespace: kube-system + version: v1beta1 + versionPriority: 100