Skip to content

Commit

Permalink
Watch namespaces to evaluate when they change
Browse files Browse the repository at this point in the history
Previously, the evaluation interval could prevent policies from being
applied on new namespaces when they appeared. Now, a new controller
watches namespaces on the target cluster, and can signal to the config
policy controller when a policy's selected namespaces have changed, so
that it can be re-evaluated.

A new controller-manager is used in hosted mode, in order to create a
cache for the namespaces on the target cluster. This change required
some setup adjustments in order to start both managers in this case.

Refs:
 - https://issues.redhat.com/browse/ACM-6428

Signed-off-by: Justin Kulikauskas <jkulikau@redhat.com>
  • Loading branch information
JustinKuli committed Aug 8, 2023
1 parent 26ff938 commit b5fbfcf
Show file tree
Hide file tree
Showing 14 changed files with 788 additions and 141 deletions.
4 changes: 3 additions & 1 deletion api/v1/configurationpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (t Target) String() string {
return fmt.Sprintf(fmtSelectorStr, t.Include, t.Exclude, *t.MatchLabels, *t.MatchExpressions)
}

// Configures the minimum elapsed time before a ConfigurationPolicy is reevaluated
// Configures the minimum elapsed time before a ConfigurationPolicy is reevaluated. If the policy
// spec is changed, or if the list of namespaces selected by the policy changes, the policy may be
// evaluated regardless of the settings here.
type EvaluationInterval struct {
//+kubebuilder:validation:Pattern=`^(?:(?:(?:[0-9]+(?:.[0-9])?)(?:h|m|s|(?:ms)|(?:us)|(?:ns)))|never)+$`
// The minimum elapsed time before a ConfigurationPolicy is reevaluated when in the compliant state. Set this to
Expand Down
35 changes: 26 additions & 9 deletions controllers/configurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type ConfigurationPolicyReconciler struct {
TargetK8sClient kubernetes.Interface
TargetK8sDynamicClient dynamic.Interface
TargetK8sConfig *rest.Config
SelectorReconciler common.SelectorReconciler
// Whether custom metrics collection is enabled
EnableMetrics bool
discoveryInfo
Expand Down Expand Up @@ -149,6 +150,8 @@ func (r *ConfigurationPolicyReconciler) Reconcile(ctx context.Context, request c
prometheus.Labels{"policy": fmt.Sprintf("%s/%s", request.Namespace, request.Name)})
_ = policyUserErrorsCounter.DeletePartialMatch(prometheus.Labels{"template": request.Name})
_ = policySystemErrorsCounter.DeletePartialMatch(prometheus.Labels{"template": request.Name})

r.SelectorReconciler.Stop(request.Name)
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -236,7 +239,7 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(

for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
if !r.shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
}

Expand Down Expand Up @@ -346,7 +349,9 @@ func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error {
// met, then that will also trigger an evaluation. If cleanupImmediately is true, then only policies
// with finalizers will be ready for evaluation regardless of the last evaluation.
// cleanupImmediately should be set true when the controller is getting uninstalled.
func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediately bool) bool {
func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
policy *policyv1.ConfigurationPolicy, cleanupImmediately bool,
) bool {
log := log.WithValues("policy", policy.GetName())

// If it's time to clean up such as when the config-policy-controller is being uninstalled, only evaluate policies
Expand All @@ -356,19 +361,19 @@ func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediate
}

if policy.ObjectMeta.DeletionTimestamp != nil {
log.V(2).Info("The policy has been deleted and is waiting for object cleanup. Will evaluate it now.")
log.V(1).Info("The policy has been deleted and is waiting for object cleanup. Will evaluate it now.")

return true
}

if policy.Status.LastEvaluatedGeneration != policy.Generation {
log.V(2).Info("The policy has been updated. Will evaluate it now.")
log.V(1).Info("The policy has been updated. Will evaluate it now.")

return true
}

if policy.Status.LastEvaluated == "" {
log.V(2).Info("The policy's status.lastEvaluated field is not set. Will evaluate it now.")
log.V(1).Info("The policy's status.lastEvaluated field is not set. Will evaluate it now.")

return true
}
Expand All @@ -387,13 +392,23 @@ func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediate
} else if policy.Status.ComplianceState == policyv1.NonCompliant && policy.Spec != nil {
interval, err = policy.Spec.EvaluationInterval.GetNonCompliantInterval()
} else {
log.V(2).Info("The policy has an unknown compliance. Will evaluate it now.")
log.V(1).Info("The policy has an unknown compliance. Will evaluate it now.")

return true
}

usesSelector := policy.Spec.NamespaceSelector.MatchLabels != nil ||
policy.Spec.NamespaceSelector.MatchExpressions != nil ||
len(policy.Spec.NamespaceSelector.Include) != 0

if usesSelector && r.SelectorReconciler.HasUpdate(policy.Name) {
log.V(1).Info("There was an update for this policy's namespaces. Will evaluate it now.")

return true
}

if errors.Is(err, policyv1.ErrIsNever) {
log.Info("Skipping the policy evaluation due to the spec.evaluationInterval value being set to never")
log.V(1).Info("Skipping the policy evaluation due to the spec.evaluationInterval value being set to never")

return false
} else if err != nil {
Expand All @@ -409,7 +424,7 @@ func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediate

nextEvaluation := lastEvaluated.Add(interval)
if nextEvaluation.Sub(time.Now().UTC()) > 0 {
log.Info("Skipping the policy evaluation due to the policy not reaching the evaluation interval")
log.V(1).Info("Skipping the policy evaluation due to the policy not reaching the evaluation interval")

return false
}
Expand Down Expand Up @@ -473,11 +488,13 @@ func (r *ConfigurationPolicyReconciler) getObjectTemplateDetails(
selector := plc.Spec.NamespaceSelector
// If MatchLabels/MatchExpressions/Include were not provided, return no namespaces
if selector.MatchLabels == nil && selector.MatchExpressions == nil && len(selector.Include) == 0 {
r.SelectorReconciler.Stop(plc.Name)

log.Info("namespaceSelector is empty. Skipping namespace retrieval.")
} else {
// If an error occurred in the NamespaceSelector, update the policy status and abort
var err error
selectedNamespaces, err = common.GetSelectedNamespaces(r.TargetK8sClient, selector)
selectedNamespaces, err = r.SelectorReconciler.Get(plc.Name, plc.Spec.NamespaceSelector)
if err != nil {
errMsg := "Error filtering namespaces with provided namespaceSelector"
log.Error(
Expand Down
19 changes: 18 additions & 1 deletion controllers/configurationpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,10 @@ func TestShouldEvaluatePolicy(t *testing.T) {
},
}

r := &ConfigurationPolicyReconciler{
SelectorReconciler: &fakeSR{},
}

for _, test := range tests {
test := test

Expand All @@ -970,14 +974,27 @@ func TestShouldEvaluatePolicy(t *testing.T) {
policy.ObjectMeta.DeletionTimestamp = test.deletionTimestamp
policy.ObjectMeta.Finalizers = test.finalizers

if actual := shouldEvaluatePolicy(policy, test.cleanupImmediately); actual != test.expected {
if actual := r.shouldEvaluatePolicy(policy, test.cleanupImmediately); actual != test.expected {
t.Fatalf("expected %v but got %v", test.expected, actual)
}
},
)
}
}

type fakeSR struct{}

func (r *fakeSR) Get(_ string, _ policyv1.Target) ([]string, error) {
return nil, nil
}

func (r *fakeSR) HasUpdate(_ string) bool {
return false
}

func (r *fakeSR) Stop(_ string) {
}

func TestShouldHandleSingleKeyFalse(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ spec:
properties:
evaluationInterval:
description: Configures the minimum elapsed time before a ConfigurationPolicy
is reevaluated
is reevaluated. If the policy spec is changed, or if the list of
namespaces selected by the policy changes, the policy may be evaluated
regardless of the settings here.
properties:
compliant:
description: The minimum elapsed time before a ConfigurationPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ spec:
properties:
evaluationInterval:
description: Configures the minimum elapsed time before a ConfigurationPolicy
is reevaluated
is reevaluated. If the policy spec is changed, or if the list of
namespaces selected by the policy changes, the policy may be evaluated
regardless of the settings here.
properties:
compliant:
description: The minimum elapsed time before a ConfigurationPolicy
Expand Down
87 changes: 82 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

"github.com/go-logr/zapr"
Expand All @@ -20,6 +21,7 @@ import (
corev1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -28,6 +30,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -202,6 +205,18 @@ func main() {
log.Info("Skipping restrictions on the ConfigurationPolicy cache because watchNamespace is empty")
}

nsTransform := func(obj interface{}) (interface{}, error) {
ns := obj.(*corev1.Namespace)
guttedNS := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Labels: ns.Labels,
},
}

return guttedNS, nil
}

// Set default manager options
options := manager.Options{
MetricsBindAddress: opts.metricsAddr,
Expand All @@ -210,7 +225,12 @@ func main() {
HealthProbeBindAddress: opts.probeAddr,
LeaderElection: opts.enableLeaderElection,
LeaderElectionID: "config-policy-controller.open-cluster-management.io",
NewCache: cache.BuilderWithOptions(cache.Options{SelectorsByObject: cacheSelectors}),
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: cacheSelectors,
TransformByObject: map[client.Object]toolscache.TransformFunc{
&corev1.Namespace{}: nsTransform,
},
}),
// Disable the cache for Secrets to avoid a watch getting created when the `policy-encryption-key`
// Secret is retrieved. Special cache handling is done by the controller.
ClientDisableCacheFor: []client.Object{&corev1.Secret{}},
Expand Down Expand Up @@ -252,12 +272,14 @@ func main() {
var targetK8sClient kubernetes.Interface
var targetK8sDynamicClient dynamic.Interface
var targetK8sConfig *rest.Config
var nsSelMgr manager.Manager // A separate controller-manager is needed in hosted mode

if opts.targetKubeConfig == "" {
targetK8sConfig = cfg
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)
} else {
nsSelMgr = mgr
} else { // "Hosted mode"
var err error

targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", opts.targetKubeConfig)
Expand All @@ -272,6 +294,18 @@ func main() {
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)

nsSelMgr, err = manager.New(targetK8sConfig, manager.Options{
NewCache: cache.BuilderWithOptions(cache.Options{
TransformByObject: map[client.Object]toolscache.TransformFunc{
&corev1.Namespace{}: nsTransform,
},
}),
})
if err != nil {
log.Error(err, "Unable to create manager from target kube config")
os.Exit(1)
}

log.Info(
"Overrode the target Kubernetes cluster for policy evaluation and enforcement",
"path", opts.targetKubeConfig,
Expand All @@ -280,6 +314,14 @@ func main() {

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

nsSelReconciler := common.NamespaceSelectorReconciler{
Client: nsSelMgr.GetClient(),
}
if err = nsSelReconciler.SetupWithManager(nsSelMgr); err != nil {
log.Error(err, "Unable to create controller", "controller", "NamespaceSelector")
os.Exit(1)
}

reconciler := controllers.ConfigurationPolicyReconciler{
Client: mgr.GetClient(),
DecryptionConcurrency: opts.decryptionConcurrency,
Expand All @@ -290,6 +332,7 @@ func main() {
TargetK8sClient: targetK8sClient,
TargetK8sDynamicClient: targetK8sDynamicClient,
TargetK8sConfig: targetK8sConfig,
SelectorReconciler: &nsSelReconciler,
EnableMetrics: opts.enableMetrics,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -353,10 +396,44 @@ func main() {
log.Info("Addon status reporting is not enabled")
}

log.Info("Starting manager")
log.Info("Starting managers")

var wg sync.WaitGroup
var errorExit bool

wg.Add(1)

go func() {
if err := mgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")

managerCancel()

errorExit = true
}

wg.Done()
}()

if opts.targetKubeConfig != "" { // "hosted mode"
wg.Add(1)

go func() {
if err := nsSelMgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")

managerCancel()

errorExit = true
}

wg.Done()
}()
}

wg.Wait()

if err := mgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")
if errorExit {
os.Exit(1)
}
}
Expand Down
Loading

0 comments on commit b5fbfcf

Please sign in to comment.