Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update API discovery cache even when incomplete #132

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions controllers/configurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,16 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(
}

start := time.Now()
policiesList := policyv1.ConfigurationPolicyList{}

var skipLoop bool
var discoveryErr error

if len(r.apiResourceList) == 0 || len(r.apiGroups) == 0 {
discoveryErr = r.refreshDiscoveryInfo()
discoveryErr := r.refreshDiscoveryInfo()

// If there was an error and no API information was received, then skip the loop.
if discoveryErr != nil && (len(r.apiResourceList) == 0 || len(r.apiGroups) == 0) {
skipLoop = true
}
}

// If it's been more than 10 minutes since the last refresh, then refresh the discovery info, but ignore
Expand All @@ -202,49 +205,45 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(
skipLoop = true
}

if !skipLoop && (discoveryErr == nil || cleanupImmediately) {
if cleanupImmediately || !skipLoop {
policiesList := policyv1.ConfigurationPolicyList{}

// This retrieves the policies from the controller-runtime cache populated by the watch.
err := r.List(context.TODO(), &policiesList)
if err != nil {
log.Error(err, "Failed to list the ConfigurationPolicy objects to evaluate")
} else {
// This is done every loop cycle since the channel needs to be variable in size to
// account for the number of policies changing.
policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items))
var wg sync.WaitGroup

skipLoop = true
}
} else {
skipLoop = true
}

// This is done every loop cycle since the channel needs to be variable in size to account for the number of
// policies changing.
policyQueue := make(chan *policyv1.ConfigurationPolicy, len(policiesList.Items))
var wg sync.WaitGroup
log.Info("Processing the policies", "count", len(policiesList.Items))

if !skipLoop {
log.Info("Processing the policies", "count", len(policiesList.Items))
// Initialize the related object map
policyRelatedObjectMap = sync.Map{}

// Initialize the related object map
policyRelatedObjectMap = sync.Map{}
for i := 0; i < int(r.EvaluationConcurrency); i++ {
wg.Add(1)

for i := 0; i < int(r.EvaluationConcurrency); i++ {
wg.Add(1)
go r.handlePolicyWorker(policyQueue, &wg)
}

go r.handlePolicyWorker(policyQueue, &wg)
}
for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
}

for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
// handle each template in each policy
policyQueue <- &policy
}

// handle each template in each policy
policyQueue <- &policy
close(policyQueue)
wg.Wait()
}
}

close(policyQueue)
wg.Wait()

// Update the related object metric after policy processing
if r.EnableMetrics {
updateRelatedObjectMetric()
Expand Down Expand Up @@ -288,35 +287,45 @@ func (r *ConfigurationPolicyReconciler) handlePolicyWorker(
}
}

// refreshDiscoveryInfo tries to discover all the available APIs on the cluster, and update the
// cached information. If it encounters an error, it may update the cache with partial results,
// if those seem "better" than what's in the current cache.
func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error {
log.V(2).Info("Refreshing the discovery info")
r.lock.Lock()
defer func() { r.lock.Unlock() }()

dd := r.TargetK8sClient.Discovery()

_, apiResourceList, err := dd.ServerGroupsAndResources()
if err != nil {
log.Error(err, "Could not get the API resource list")

return err
_, apiResourceList, resourceErr := dd.ServerGroupsAndResources()
if resourceErr != nil {
log.Error(resourceErr, "Could not get the full API resource list")
}

r.apiResourceList = apiResourceList
if resourceErr == nil || (len(apiResourceList) > len(r.discoveryInfo.apiResourceList)) {
// update the list if it's complete, or if it's "better" than the old one
r.discoveryInfo.apiResourceList = apiResourceList
}

apiGroups, err := restmapper.GetAPIGroupResources(dd)
if err != nil {
log.Error(err, "Could not get the API groups list")
apiGroups, groupErr := restmapper.GetAPIGroupResources(dd)
if groupErr != nil {
log.Error(groupErr, "Could not get the full API groups list")
}

return err
if (resourceErr == nil && groupErr == nil) || (len(apiGroups) > len(r.discoveryInfo.apiGroups)) {
// update the list if it's complete, or if it's "better" than the old one
r.discoveryInfo.apiGroups = apiGroups
}

r.apiGroups = apiGroups
// Reset the OpenAPI cache in case the CRDs were updated since the last fetch.
r.openAPIParser = openapi.NewOpenAPIParser(dd)
r.discoveryLastRefreshed = time.Now().UTC()
r.discoveryInfo.discoveryLastRefreshed = time.Now().UTC()

return nil
if resourceErr != nil {
return resourceErr
}

return groupErr // can be nil
}

// shouldEvaluatePolicy will determine if the policy is ready for evaluation by examining the
Expand Down
84 changes: 60 additions & 24 deletions test/e2e/case18_discovery_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,41 @@ import (
. "github.com/onsi/gomega"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"open-cluster-management.io/config-policy-controller/test/utils"
)

const (
case18PolicyName = "policy-c18"
case18Policy = "../resources/case18_discovery_refresh/policy.yaml"
case18PolicyTemplateName = "policy-c18-template"
case18PolicyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml"
case18PolicyTemplate = "../resources/case18_discovery_refresh/policy-template.yaml"
case18ConfigMapName = "c18-configmap"
)
var _ = Describe("Test discovery info refresh", Ordered, func() {
const (
policyName = "policy-c18"
policyYaml = "../resources/case18_discovery_refresh/policy.yaml"
policyTemplateName = "policy-c18-template"
policyTemplatePreReqs = "../resources/case18_discovery_refresh/prereqs-for-template-policy.yaml"
policyTemplateYaml = "../resources/case18_discovery_refresh/policy-template.yaml"
configMapName = "c18-configmap"
badAPIServiceYaml = "../resources/case18_discovery_refresh/bad-apiservice.yaml"
)

var _ = Describe("Test discovery info refresh", func() {
It("Verifies that the discovery info is refreshed after a CRD is installed", func() {
By("Creating " + case18PolicyName + " on managed")
utils.Kubectl("apply", "-f", case18Policy, "-n", testNamespace)
By("Creating " + policyName + " on managed")
utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace)
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyName,
policyName,
testNamespace,
true,
defaultTimeoutSeconds,
)
Expect(policy).NotTo(BeNil())

By("Verifying " + case18PolicyName + " becomes compliant")
By("Verifying " + policyName + " becomes compliant")
Eventually(func() interface{} {
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyName,
policyName,
testNamespace,
true,
defaultTimeoutSeconds,
Expand All @@ -54,12 +56,33 @@ var _ = Describe("Test discovery info refresh", func() {
})

It("Verifies that the discovery info is refreshed when a template references a new object kind", func() {
By("Adding a non-functional API service")
utils.Kubectl("apply", "-f", badAPIServiceYaml)

By("Checking that the API causes an error during API discovery")
Eventually(
func() error {
cmd := exec.Command("kubectl", "api-resources")

err := cmd.Start()
if err != nil {
return nil // Just retry. If this consistently has an error, the test should fail.
}

err = cmd.Wait()

return err
},
defaultTimeoutSeconds,
1,
).ShouldNot(BeNil())

By("Creating the prerequisites on managed")
// This needs to be wrapped in an eventually since the object can't be created immediately after the CRD
// is created.
Eventually(
func() interface{} {
cmd := exec.Command("kubectl", "apply", "-f", case18PolicyTemplatePreReqs)
cmd := exec.Command("kubectl", "apply", "-f", policyTemplatePreReqs)

err := cmd.Start()
if err != nil {
Expand All @@ -74,24 +97,24 @@ var _ = Describe("Test discovery info refresh", func() {
1,
).Should(BeNil())

By("Creating " + case18PolicyTemplateName + " on managed")
utils.Kubectl("apply", "-f", case18PolicyTemplate, "-n", testNamespace)
By("Creating " + policyTemplateName + " on managed")
utils.Kubectl("apply", "-f", policyTemplateYaml, "-n", testNamespace)
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyTemplateName,
policyTemplateName,
testNamespace,
true,
defaultTimeoutSeconds,
)
Expect(policy).NotTo(BeNil())

By("Verifying " + case18PolicyTemplateName + " becomes compliant")
By("Verifying " + policyTemplateName + " becomes compliant")
Eventually(func() interface{} {
policy := utils.GetWithTimeout(
clientManagedDynamic,
gvrConfigPolicy,
case18PolicyTemplateName,
policyTemplateName,
testNamespace,
true,
defaultTimeoutSeconds,
Expand All @@ -101,9 +124,9 @@ var _ = Describe("Test discovery info refresh", func() {
}, defaultTimeoutSeconds, 1).Should(Equal("Compliant"))
})

It("Cleans up", func() {
AfterAll(func() {
err := clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete(
context.TODO(), case18PolicyName, metav1.DeleteOptions{},
context.TODO(), policyName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand All @@ -117,7 +140,7 @@ var _ = Describe("Test discovery info refresh", func() {
}

err = clientManagedDynamic.Resource(gvrConfigPolicy).Namespace(testNamespace).Delete(
context.TODO(), case18PolicyTemplateName, metav1.DeleteOptions{},
context.TODO(), policyTemplateName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand All @@ -131,7 +154,20 @@ var _ = Describe("Test discovery info refresh", func() {
}

err = clientManaged.CoreV1().ConfigMaps("default").Delete(
context.TODO(), case18ConfigMapName, metav1.DeleteOptions{},
context.TODO(), configMapName, metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
}

gvrAPIService := schema.GroupVersionResource{
Group: "apiregistration.k8s.io",
Version: "v1",
Resource: "apiservices",
}

err = clientManagedDynamic.Resource(gvrAPIService).Delete(
context.TODO(), "v1beta1.pizza.example.com", metav1.DeleteOptions{},
)
if !k8serrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred())
Expand Down
13 changes: 13 additions & 0 deletions test/resources/case18_discovery_refresh/bad-apiservice.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta1.pizza.example.com
spec:
group: pizza.example.com
groupPriorityMinimum: 100
insecureSkipTLSVerify: true
service:
name: pizza-server
namespace: kube-system
version: v1beta1
versionPriority: 100