Skip to content

Commit

Permalink
Update API discovery cache even when incomplete
Browse files Browse the repository at this point in the history
Previously, if there were any APIs having issues on the cluster, no new
API information could be used by the controller. Now, partial results
can still be updated. Additionally, API discovery errors during startup
will not prevent the controller from functioning at all.

Refs:
 - https://issues.redhat.com/browse/ACM-5491

Signed-off-by: Justin Kulikauskas <jkulikau@redhat.com>
  • Loading branch information
JustinKuli committed May 18, 2023
1 parent 7c79594 commit da127d8
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 68 deletions.
97 changes: 53 additions & 44 deletions controllers/configurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,16 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(
}

start := time.Now()
policiesList := policyv1.ConfigurationPolicyList{}

var skipLoop bool
var discoveryErr error

if len(r.apiResourceList) == 0 || len(r.apiGroups) == 0 {
discoveryErr = r.refreshDiscoveryInfo()
discoveryErr := r.refreshDiscoveryInfo()

// If there was an error and no API information was received, then skip the loop.
if discoveryErr != nil && (len(r.apiResourceList) == 0 || len(r.apiGroups) == 0) {
skipLoop = true
}
}

// If it's been more than 10 minutes since the last refresh, then refresh the discovery info, but ignore
Expand All @@ -202,49 +205,45 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(
skipLoop = true
}

if !skipLoop && (discoveryErr == nil || cleanupImmediately) {
if cleanupImmediately || !skipLoop {
policiesList := policyv1.ConfigurationPolicyList{}

// This retrieves the policies from the controller-runtime cache populated by the watch.
err := r.List(context.TODO(), &policiesList)
if err != nil {
log.Error(err, "Failed to list the ConfigurationPolicy objects to evaluate")
} else {
// This is done every loop cycle since the channel needs to be variable in size to
// account for the number of policies changing.
policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items))
var wg sync.WaitGroup

skipLoop = true
}
} else {
skipLoop = true
}

// This is done every loop cycle since the channel needs to be variable in size to account for the number of
// policies changing.
policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items))
var wg sync.WaitGroup
log.Info("Processing the policies", "count", len(policiesList.Items))

if !skipLoop {
log.Info("Processing the policies", "count", len(policiesList.Items))
// Initialize the related object map
policyRelatedObjectMap = sync.Map{}

// Initialize the related object map
policyRelatedObjectMap = sync.Map{}
for i := 0; i < int(r.EvaluationConcurrency); i++ {
wg.Add(1)

for i := 0; i < int(r.EvaluationConcurrency); i++ {
wg.Add(1)
go r.handlePolicyWorker(policyQueue, &wg)
}

go r.handlePolicyWorker(policyQueue, &wg)
}
for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
}

for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
// handle each template in each policy
policyQueue <- &policy
}

// handle each template in each policy
policyQueue <- &policy
close(policyQueue)
wg.Wait()
}
}

close(policyQueue)
wg.Wait()

// Update the related object metric after policy processing
if r.EnableMetrics {
updateRelatedObjectMetric()
Expand Down Expand Up @@ -288,35 +287,45 @@ func (r *ConfigurationPolicyReconciler) handlePolicyWorker(
}
}

// refreshDiscoveryInfo tries to discover all the available APIs on the cluster, and update the
// cached information. If it encounters an error, it may update the cache with partial results,
// if those seem "better" than what's in the current cache.
func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error {
log.V(2).Info("Refreshing the discovery info")
r.lock.Lock()
defer func() { r.lock.Unlock() }()

dd := r.TargetK8sClient.Discovery()

_, apiResourceList, err := dd.ServerGroupsAndResources()
if err != nil {
log.Error(err, "Could not get the API resource list")

return err
_, apiResourceList, resourceErr := dd.ServerGroupsAndResources()
if resourceErr != nil {
log.Error(resourceErr, "Could not get the full API resource list")
}

r.apiResourceList = apiResourceList
if resourceErr == nil || (len(apiResourceList) > len(r.discoveryInfo.apiResourceList)) {
// update the list if it's complete, or if it's "better" than the old one
r.discoveryInfo.apiResourceList = apiResourceList
}

apiGroups, err := restmapper.GetAPIGroupResources(dd)
if err != nil {
log.Error(err, "Could not get the API groups list")
apiGroups, groupErr := restmapper.GetAPIGroupResources(dd)
if groupErr != nil {
log.Error(groupErr, "Could not get the full API groups list")
}

return err
if (resourceErr == nil && groupErr == nil) || (len(apiGroups) > len(r.discoveryInfo.apiGroups)) {
// update the list if it's complete, or if it's "better" than the old one
r.discoveryInfo.apiGroups = apiGroups
}

r.apiGroups = apiGroups
// Reset the OpenAPI cache in case the CRDs were updated since the last fetch.
r.openAPIParser = openapi.NewOpenAPIParser(dd)
r.discoveryLastRefreshed = time.Now().UTC()
r.discoveryInfo.discoveryLastRefreshed = time.Now().UTC()

return nil
if resourceErr != nil {
return resourceErr
}

return groupErr // can be nil
}

// shouldEvaluatePolicy will determine if the policy is ready for evaluation by examining the
Expand Down
84 changes: 60 additions & 24 deletions test/e2e/case18_discovery_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,41 @@ import (
. "github.com/onsi/gomega"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"open-cluster-management.io/config-policy-controller/test/utils"
)

const (
case18PolicyName = "policy-c18"
case18Policy = "../resources/case18_discovery_refresh/policy.yaml"
case18PolicyTemplateName = "policy-c18-template"
case18PolicyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml"
case18PolicyTemplate = "../resources/case18_discovery_refresh/policy-template.yaml"
case18ConfigMapName = "c18-configmap"
)
var _ = Describe("Test discovery info refresh", Ordered, func() {
const (
policyName = "policy-c18"
policyYaml = "../resources/case18_discovery_refresh/policy.yaml"
policyTemplateName = "policy-c18-template"
policyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml"
policyTemplateYaml = "../resources/case18_discovery_refresh/policy-template.yaml"
configMapName = "c18-configmap"
badAPIServiceYaml = "../resources/case18_discovery_refresh/bad-apiservice.yaml"
)

var _ = Describe("Test discovery info refresh", func() {
It("Verifies that the discovery info is refreshed after a CRD is installed", func() {
By("Creating " + case18PolicyName + " on managed")
utils.Kubectl("apply", "-f", case18Policy, "-n", testNamespace)
By("Creating " + policyName + " on managed")
utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace)
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyName,
policyName,
testNamespace,
true,
defaultTimeoutSeconds,
)
Expect(policy).NotTo(BeNil())

By("Verifying " + case18PolicyName + " becomes compliant")
By("Verifying " + policyName + " becomes compliant")
Eventually(func() interface{} {
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyName,
policyName,
testNamespace,
true,
defaultTimeoutSeconds,
Expand All @@ -54,12 +56,33 @@ var _ = Describe("Test discovery info refresh", func() {
})

It("Verifies that the discovery info is refreshed when a template references a new object kind", func() {
By("Adding a non-functional API service")
utils.Kubectl("apply", "-f", badAPIServiceYaml)

By("Checking that the API causes an error during API discovery")
Eventually(
func() error {
cmd := exec.Command("kubectl", "api-resources")

err := cmd.Start()
if err != nil {
return nil // Just retry. If this consistently has an error, the test should fail.
}

err = cmd.Wait()

return err
},
defaultTimeoutSeconds,
1,
).ShouldNot(BeNil())

By("Creating the prerequisites on managed")
// This needs to be wrapped in an eventually since the object can't be created immediately after the CRD
// is created.
Eventually(
func() interface{} {
cmd := exec.Command("kubectl", "apply", "-f", case18PolicyTemplatePreReqs)
cmd := exec.Command("kubectl", "apply", "-f", policyTemplatePreReqs)

err := cmd.Start()
if err != nil {
Expand All @@ -74,24 +97,24 @@ var _ = Describe("Test discovery info refresh", func() {
1,
).Should(BeNil())

By("Creating " + case18PolicyTemplateName + " on managed")
utils.Kubectl("apply", "-f", case18PolicyTemplate, "-n", testNamespace)
By("Creating " + policyTemplateName + " on managed")
utils.Kubectl("apply", "-f", policyTemplateYaml, "-n", testNamespace)
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyTemplateName,
policyTemplateName,
testNamespace,
true,
defaultTimeoutSeconds,
)
Expect(policy).NotTo(BeNil())

By("Verifying " + case18PolicyTemplateName + " becomes compliant")
By("Verifying " + policyTemplateName + " becomes compliant")
Eventually(func() interface{} {
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyTemplateName,
policyTemplateName,
testNamespace,
true,
defaultTimeoutSeconds,
Expand All @@ -101,9 +124,9 @@ var _ = Describe("Test discovery info refresh", func() {
}, defaultTimeoutSeconds, 1).Should(Equal("Compliant"))
})

It("Cleans up", func() {
AfterAll(func() {
err := clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete(
context.TODO(), case18PolicyName, metav1.DeleteOptions{},
context.TODO(), policyName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand All @@ -117,7 +140,7 @@ var _ = Describe("Test discovery info refresh", func() {
}

err = clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete(
context.TODO(), case18PolicyTemplateName, metav1.DeleteOptions{},
context.TODO(), policyTemplateName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand All @@ -131,7 +154,20 @@ var _ = Describe("Test discovery info refresh", func() {
}

err = clientManaged.CoreV1().ConfigMaps("default").Delete(
context.TODO(), case18ConfigMapName, metav1.DeleteOptions{},
context.TODO(), configMapName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
}

gvrAPIService := schema.GroupVersionResource{
Group: "apiregistration.k8s.io",
Version: "v1",
Resource: "apiservices",
}

err = clientManagedDynamic.Resource(gvrAPIService).Delete(
context.TODO(), "v1beta1.pizza.example.com", metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand Down
13 changes: 13 additions & 0 deletions test/resources/case18_discovery_refresh/bad-apiservice.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta1.pizza.example.com
spec:
group: pizza.example.com
groupPriorityMinimum: 100
insecureSkipTLSVerify: true
service:
name: pizza-server
namespace: kube-system
version: v1beta1
versionPriority: 100

0 comments on commit da127d8

Please sign in to comment.