Skip to content

Commit

Permalink
Watch namespaces to evaluate when they change
Browse files Browse the repository at this point in the history
Previously, the evaluation interval could prevent policies from being
applied on new namespaces when they appeared. Now, a new controller
watches namespaces on the target cluster, and can signal to the config
policy controller when a policy's selected namespaces have changed, so
that it can be re-evaluated.

Refs:
 - https://issues.redhat.com/browse/ACM-6428

Signed-off-by: Justin Kulikauskas <jkulikau@redhat.com>
  • Loading branch information
JustinKuli committed Aug 8, 2023
1 parent 6e32ff4 commit 19286cc
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 19 deletions.
23 changes: 20 additions & 3 deletions controllers/configurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type ConfigurationPolicyReconciler struct {
TargetK8sClient kubernetes.Interface
TargetK8sDynamicClient dynamic.Interface
TargetK8sConfig *rest.Config
SelectorReconciler common.SelectorReconciler
// Whether custom metrics collection is enabled
EnableMetrics bool
discoveryInfo
Expand Down Expand Up @@ -149,6 +150,8 @@ func (r *ConfigurationPolicyReconciler) Reconcile(ctx context.Context, request c
prometheus.Labels{"policy": fmt.Sprintf("%s/%s", request.Namespace, request.Name)})
_ = policyUserErrorsCounter.DeletePartialMatch(prometheus.Labels{"template": request.Name})
_ = policySystemErrorsCounter.DeletePartialMatch(prometheus.Labels{"template": request.Name})

r.SelectorReconciler.Stop(request.Name)
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -236,7 +239,7 @@ func (r *ConfigurationPolicyReconciler) PeriodicallyExecConfigPolicies(

for i := range policiesList.Items {
policy := policiesList.Items[i]
if !shouldEvaluatePolicy(&policy, cleanupImmediately) {
if !r.shouldEvaluatePolicy(&policy, cleanupImmediately) {
continue
}

Expand Down Expand Up @@ -346,7 +349,9 @@ func (r *ConfigurationPolicyReconciler) refreshDiscoveryInfo() error {
// met, then that will also trigger an evaluation. If cleanupImmediately is true, then only policies
// with finalizers will be ready for evaluation regardless of the last evaluation.
// cleanupImmediately should be set true when the controller is getting uninstalled.
func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediately bool) bool {
func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
policy *policyv1.ConfigurationPolicy, cleanupImmediately bool,
) bool {
log := log.WithValues("policy", policy.GetName())

// If it's time to clean up such as when the config-policy-controller is being uninstalled, only evaluate policies
Expand Down Expand Up @@ -392,6 +397,16 @@ func shouldEvaluatePolicy(policy *policyv1.ConfigurationPolicy, cleanupImmediate
return true
}

usesSelector := policy.Spec.NamespaceSelector.MatchLabels != nil ||
policy.Spec.NamespaceSelector.MatchExpressions != nil ||
len(policy.Spec.NamespaceSelector.Include) != 0

if usesSelector && r.SelectorReconciler.HasUpdate(policy.Name) {
log.V(2).Info("There was an update for this policy's namespaces. Will evaluate it now.")

return true
}

if errors.Is(err, policyv1.ErrIsNever) {
log.Info("Skipping the policy evaluation due to the spec.evaluationInterval value being set to never")

Expand Down Expand Up @@ -473,11 +488,13 @@ func (r *ConfigurationPolicyReconciler) getObjectTemplateDetails(
selector := plc.Spec.NamespaceSelector
// If MatchLabels/MatchExpressions/Include were not provided, return no namespaces
if selector.MatchLabels == nil && selector.MatchExpressions == nil && len(selector.Include) == 0 {
r.SelectorReconciler.Stop(plc.Name)

log.Info("namespaceSelector is empty. Skipping namespace retrieval.")
} else {
// If an error occurred in the NamespaceSelector, update the policy status and abort
var err error
selectedNamespaces, err = common.GetSelectedNamespaces(r.TargetK8sClient, selector)
selectedNamespaces, err = r.SelectorReconciler.Get(plc.Name, plc.Spec.NamespaceSelector)
if err != nil {
errMsg := "Error filtering namespaces with provided namespaceSelector"
log.Error(
Expand Down
19 changes: 18 additions & 1 deletion controllers/configurationpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,10 @@ func TestShouldEvaluatePolicy(t *testing.T) {
},
}

r := &ConfigurationPolicyReconciler{
SelectorReconciler: &fakeSR{},
}

for _, test := range tests {
test := test

Expand All @@ -970,14 +974,27 @@ func TestShouldEvaluatePolicy(t *testing.T) {
policy.ObjectMeta.DeletionTimestamp = test.deletionTimestamp
policy.ObjectMeta.Finalizers = test.finalizers

if actual := shouldEvaluatePolicy(policy, test.cleanupImmediately); actual != test.expected {
if actual := r.shouldEvaluatePolicy(policy, test.cleanupImmediately); actual != test.expected {
t.Fatalf("expected %v but got %v", test.expected, actual)
}
},
)
}
}

type fakeSR struct{}

func (r *fakeSR) Get(_ string, _ policyv1.Target) ([]string, error) {
return nil, nil
}

func (r *fakeSR) HasUpdate(_ string) bool {
return false
}

func (r *fakeSR) Stop(_ string) {
}

func TestShouldHandleSingleKeyFalse(t *testing.T) {
t.Parallel()

Expand Down
92 changes: 86 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

"github.com/go-logr/zapr"
Expand All @@ -20,6 +21,7 @@ import (
corev1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -28,6 +30,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -202,6 +205,18 @@ func main() {
log.Info("Skipping restrictions on the ConfigurationPolicy cache because watchNamespace is empty")
}

nsTransform := func(obj interface{}) (interface{}, error) {
ns := obj.(*corev1.Namespace)
guttedNS := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Labels: ns.Labels,
},
}

return guttedNS, nil
}

// Set default manager options
options := manager.Options{
MetricsBindAddress: opts.metricsAddr,
Expand All @@ -210,7 +225,12 @@ func main() {
HealthProbeBindAddress: opts.probeAddr,
LeaderElection: opts.enableLeaderElection,
LeaderElectionID: "config-policy-controller.open-cluster-management.io",
NewCache: cache.BuilderWithOptions(cache.Options{SelectorsByObject: cacheSelectors}),
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: cacheSelectors,
TransformByObject: map[client.Object]toolscache.TransformFunc{
&corev1.Namespace{}: nsTransform,
},
}),
// Disable the cache for Secrets to avoid a watch getting created when the `policy-encryption-key`
// Secret is retrieved. Special cache handling is done by the controller.
ClientDisableCacheFor: []client.Object{&corev1.Secret{}},
Expand Down Expand Up @@ -252,12 +272,14 @@ func main() {
var targetK8sClient kubernetes.Interface
var targetK8sDynamicClient dynamic.Interface
var targetK8sConfig *rest.Config
var nsSelMgr manager.Manager // A separate controller-manager is needed in hosted mode

if opts.targetKubeConfig == "" {
targetK8sConfig = cfg
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)
} else {
nsSelMgr = mgr
} else { // "Hosted mode"
var err error

targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", opts.targetKubeConfig)
Expand All @@ -266,9 +288,24 @@ func main() {
os.Exit(1)
}

targetK8sConfig.Burst = int(opts.clientBurst)
targetK8sConfig.QPS = opts.clientQPS

targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)

nsSelMgr, err = manager.New(targetK8sConfig, manager.Options{
NewCache: cache.BuilderWithOptions(cache.Options{
TransformByObject: map[client.Object]toolscache.TransformFunc{
&corev1.Namespace{}: nsTransform,
},
}),
})
if err != nil {
log.Error(err, "Unable to create manager from target kube config")
os.Exit(1)
}

log.Info(
"Overrode the target Kubernetes cluster for policy evaluation and enforcement",
"path", opts.targetKubeConfig,
Expand All @@ -277,6 +314,14 @@ func main() {

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

nsSelReconciler := common.NamespaceSelectorReconciler{
Client: nsSelMgr.GetClient(),
}
if err = nsSelReconciler.SetupWithManager(nsSelMgr); err != nil {
log.Error(err, "Unable to create controller", "controller", "NamespaceSelector")
os.Exit(1)
}

reconciler := controllers.ConfigurationPolicyReconciler{
Client: mgr.GetClient(),
DecryptionConcurrency: opts.decryptionConcurrency,
Expand All @@ -287,6 +332,7 @@ func main() {
TargetK8sClient: targetK8sClient,
TargetK8sDynamicClient: targetK8sDynamicClient,
TargetK8sConfig: targetK8sConfig,
SelectorReconciler: &nsSelReconciler,
EnableMetrics: opts.enableMetrics,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -309,7 +355,7 @@ func main() {
managerCtx, managerCancel := context.WithCancel(context.Background())

// PeriodicallyExecConfigPolicies is the go-routine that periodically checks the policies
log.V(1).Info("Perodically processing Configuration Policies", "frequency", opts.frequency)
log.V(1).Info("Periodically processing Configuration Policies", "frequency", opts.frequency)

go func() {
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, opts.frequency, mgr.Elected())
Expand Down Expand Up @@ -350,10 +396,44 @@ func main() {
log.Info("Addon status reporting is not enabled")
}

log.Info("Starting manager")
log.Info("Starting managers")

var wg sync.WaitGroup
var errorExit bool

wg.Add(1)

go func() {
if err := mgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")

managerCancel()

errorExit = true
}

wg.Done()
}()

if opts.targetKubeConfig != "" { // "hosted mode"
wg.Add(1)

go func() {
if err := nsSelMgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")

managerCancel()

errorExit = true
}

wg.Done()
}()
}

wg.Wait()

if err := mgr.Start(managerCtx); err != nil {
log.Error(err, "Problem running manager")
if errorExit {
os.Exit(1)
}
}
Expand Down
Loading

0 comments on commit 19286cc

Please sign in to comment.