Skip to content

Commit

Permalink
Synchronously emit template events
Browse files Browse the repository at this point in the history
Previously, the controller-runtime event recorder was used for these
events. Other policy controllers have moved away from that, for various
reasons. In this case, if a policy went from pending to noncompliant and
back to pending, the "old" pending event would be re-used by the event
recorder, and only the `lastTimestamp` would be updated. In this case,
if a policy controller emitted a compliance event within the same second
as the Pending event, the status-sync would see it as a tie, and use the
hex-encoded nanoseconds in the event name. But the event name was not
updated from the original instance when the policy was pending, so the
events would be ordered incorrectly.

Most error cases from this synchronous sending can be ignored because
they are already error cases that would be requeued.

Refs:
 - https://issues.redhat.com/browse/ACM-4699

Signed-off-by: Justin Kulikauskas <jkulikau@redhat.com>
  • Loading branch information
JustinKuli authored and openshift-merge-robot committed Apr 18, 2023
1 parent c01cf0a commit f0e2c60
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 33 deletions.
141 changes: 108 additions & 33 deletions controllers/templatesync/template_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type PolicyReconciler struct {
Recorder record.EventRecorder
ClusterNamespace string
Clientset *kubernetes.Clientset
InstanceName string
DisableGkSync bool
createdGkConstraint *bool
}
Expand Down Expand Up @@ -262,7 +263,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = err
errMsg := fmt.Sprintf("Failed to decode policy template with err: %s", err)

r.emitTemplateError(instance, tIndex, fmt.Sprintf("[template %v]", tIndex), false, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, fmt.Sprintf("[template %v]", tIndex), false, errMsg)
reqLogger.Error(resultError, "Failed to decode the policy template", "templateIndex", tIndex)

policyUserErrorsCounter.WithLabelValues(instance.Name, "", "format-error").Inc()
Expand Down Expand Up @@ -310,7 +312,9 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = fmt.Errorf("dependency on %s has conflicting compliance states", dep.Name)
errMsg := fmt.Sprintf("Failed to decode policy template with err: %s", resultError)

r.emitTemplateError(instance, tIndex, fmt.Sprintf("[template %v]", tIndex), isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex,
fmt.Sprintf("[template %v]", tIndex), isClusterScoped, errMsg)
reqLogger.Error(resultError, "Failed to decode the policy template", "templateIndex", tIndex)

depConflictErr = true
Expand Down Expand Up @@ -338,7 +342,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
errMsg := fmt.Sprintf("Failed to get name from policy template at index %v", tIndex)
resultError = k8serrors.NewBadRequest(errMsg)

r.emitTemplateError(instance, tIndex, fmt.Sprintf("[template %v]", tIndex), isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, fmt.Sprintf("[template %v]", tIndex), isClusterScoped, errMsg)
reqLogger.Error(resultError, "Failed to process the policy template", "templateIndex", tIndex)

policyUserErrorsCounter.WithLabelValues(instance.Name, "", "format-error").Inc()
Expand Down Expand Up @@ -367,7 +372,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

errMsg += fmt.Sprintf(": %s", err)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(err, "Could not find an API mapping for the object definition",
"group", gvk.Group,
"version", gvk.Version,
Expand Down Expand Up @@ -415,7 +421,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

resultError = err

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(err, "Unsupported policy-template kind found in object definition",
"group", gvk.Group,
"version", gvk.Version,
Expand All @@ -435,7 +442,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
errMsg := fmt.Sprintf("Templates are not supported for kind : %s", gvk.Kind)
resultError = k8serrors.NewBadRequest(errMsg)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(resultError, "Failed to process the policy template")

policyUserErrorsCounter.WithLabelValues(instance.Name, tName, "format-error").Inc()
Expand Down Expand Up @@ -464,7 +472,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = err
errMsg := fmt.Sprintf("Failed to unmarshal the policy template: %s", err)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(resultError, "Failed to unmarshal the policy template")

policySystemErrorsCounter.WithLabelValues(instance.Name, tName, "unmarshal-error").Inc()
Expand All @@ -486,7 +495,14 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
if err != nil {
if len(dependencyFailures) > 0 {
// template must be pending, do not create it
r.emitTemplatePending(instance, tIndex, tName, isClusterScoped, generatePendingMsg(dependencyFailures))
emitErr := r.emitTemplatePending(ctx, instance, tIndex, tName, isClusterScoped,
generatePendingMsg(dependencyFailures))
if emitErr != nil {
resultError = emitErr

continue
}

tLogger.Info("Dependencies were not satisfied for the policy template",
"namespace", instance.GetNamespace(),
"kind", gvk.Kind,
Expand Down Expand Up @@ -530,7 +546,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = err
errMsg := fmt.Sprintf("Failed to create policy template: %s", err)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(resultError, "Failed to create policy template")

policySystemErrorsCounter.WithLabelValues(instance.Name, tName, "create-error").Inc()
Expand All @@ -555,7 +572,11 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
if isGkConstraintTemplate {
tLogger.Info("Emitting status event for " + gvk.Kind)
msg := fmt.Sprintf("%s %s was created successfully", gvk.Kind, tName)
r.emitTemplateSuccess(instance, tIndex, tName, isClusterScoped, msg)

emitErr := r.emitTemplateSuccess(ctx, instance, tIndex, tName, isClusterScoped, msg)
if emitErr != nil {
resultError = emitErr
}
}
}

Expand All @@ -574,7 +595,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = err
errMsg := fmt.Sprintf("Failed to get the object in the policy template: %s", err)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(err, "Failed to get the object in the policy template",
"namespace", instance.GetNamespace(),
"kind", gvk.Kind,
Expand All @@ -588,12 +610,17 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

if len(dependencyFailures) > 0 {
// template must be pending, need to delete it and error
r.emitTemplatePending(instance, tIndex, tName, isClusterScoped, generatePendingMsg(dependencyFailures))
tLogger.Info("Dependencies were not satisfied for the policy template",
"namespace", instance.GetNamespace(),
"kind", gvk.Kind,
)

emitErr := r.emitTemplatePending(ctx, instance, tIndex, tName,
isClusterScoped, generatePendingMsg(dependencyFailures))
if emitErr != nil {
resultError = err
}

err = res.Delete(ctx, tName, metav1.DeleteOptions{})
if err != nil {
tLogger.Error(err, "Failed to delete a template that entered pending state",
Expand Down Expand Up @@ -665,7 +692,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

resultError = k8serrors.NewBadRequest(errMsg)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(resultError, "Failed to create the policy template")

policyUserErrorsCounter.WithLabelValues(instance.Name, tName, "format-error").Inc()
Expand Down Expand Up @@ -711,7 +739,8 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
resultError = err
errMsg := fmt.Sprintf("Failed to update policy template %s: %s", tName, err)

r.emitTemplateError(instance, tIndex, tName, isClusterScoped, errMsg)
//nolint:errcheck // it will already be requeued for the resultError.
r.emitTemplateError(ctx, instance, tIndex, tName, isClusterScoped, errMsg)
tLogger.Error(err, "Failed to update the policy template")

policySystemErrorsCounter.WithLabelValues(instance.Name, tName, "patch-error").Inc()
Expand All @@ -734,7 +763,11 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
if isGkConstraintTemplate {
tLogger.Info("Emitting status event for " + gvk.Kind)
msg := fmt.Sprintf("%s %s was updated successfully", gvk.Kind, tName)
r.emitTemplateSuccess(instance, tIndex, tName, isClusterScoped, msg)

emitErr := r.emitTemplateSuccess(ctx, instance, tIndex, tName, isClusterScoped, msg)
if emitErr != nil {
resultError = emitErr
}
}
}

Expand Down Expand Up @@ -1186,26 +1219,39 @@ func overrideRemediationAction(instance *policiesv1.Policy, tObjectUnstructured
// policy framework. If the policy's status already reflects the current message, then no actions
// are taken.
func (r *PolicyReconciler) emitTemplateSuccess(
pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, msg string,
) {
r.emitTemplateEvent(pol, tIndex, tName, clusterScoped, "Normal", "Compliant; ", msg)
ctx context.Context, pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, msg string,
) error {
err := r.emitTemplateEvent(ctx, pol, tIndex, tName, clusterScoped, "Normal", "Compliant; ", msg)
if err != nil {
tlog := log.WithValues("Policy.Namespace", pol.Namespace, "Policy.Name", pol.Name, "template", tName)
tlog.Error(err, "Failed to emit template success event")
}

return err
}

// emitTemplateError performs actions that ensure correct reporting of template errors in the
// policy framework. If the policy's status already reflects the current error, then no actions
// are taken.
func (r *PolicyReconciler) emitTemplateError(
pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, errMsg string,
) {
r.emitTemplateEvent(pol, tIndex, tName, clusterScoped, "Warning", "NonCompliant; template-error; ", errMsg)
ctx context.Context, pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, errMsg string,
) error {
err := r.emitTemplateEvent(ctx, pol, tIndex, tName, clusterScoped,
"Warning", "NonCompliant; template-error; ", errMsg)
if err != nil {
tlog := log.WithValues("Policy.Namespace", pol.Namespace, "Policy.Name", pol.Name, "template", tName)
tlog.Error(err, "Failed to emit template error event")
}

return err
}

// emitTemplatePending performs actions that ensure correct reporting of pending dependencies in the
// policy framework. If the policy's status already reflects the current status, then no actions
// are taken.
func (r *PolicyReconciler) emitTemplatePending(
pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, msg string,
) {
ctx context.Context, pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool, msg string,
) error {
msgMeta := "Pending; "
eventType := "Warning"

Expand All @@ -1215,33 +1261,62 @@ func (r *PolicyReconciler) emitTemplatePending(
eventType = "Normal"
}

r.emitTemplateEvent(pol, tIndex, tName, clusterScoped, eventType, msgMeta, msg)
err := r.emitTemplateEvent(ctx, pol, tIndex, tName, clusterScoped, eventType, msgMeta, msg)
if err != nil {
tlog := log.WithValues("Policy.Namespace", pol.Namespace, "Policy.Name", pol.Name, "template", tName)
tlog.Error(err, "Failed to emit template pending event")
}

return err
}

// emitTemplateEvent performs actions that ensure correct reporting of template sync events. If the
// policy's status already reflects the current status, then no actions are taken. The msgMeta and
// msg are concatenated without spaces, so any spacing should be included inside the msgMeta string.
func (r *PolicyReconciler) emitTemplateEvent(
pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool,
ctx context.Context, pol *policiesv1.Policy, tIndex int, tName string, clusterScoped bool,
eventType string, msgMeta string, msg string,
) {
) error {
// check if the error is already present in the policy status - if so, return early
if strings.Contains(getLatestStatusMessage(pol, tIndex), msgMeta+msg) {
return
return nil
}

// emit the non-compliance event
// emit an informational event
r.Recorder.Event(pol, eventType, "PolicyTemplateSync", msg)

// emit the compliance event
var policyComplianceReason string
if clusterScoped {
policyComplianceReason = fmt.Sprintf(utils.PolicyClusterScopedFmtStr, tName)
policyComplianceReason = utils.EventReason("", tName)
} else {
policyComplianceReason = fmt.Sprintf(utils.PolicyFmtStr, pol.GetNamespace(), tName)
policyComplianceReason = utils.EventReason(pol.GetNamespace(), tName)
}

r.Recorder.Event(pol, eventType, policyComplianceReason, msgMeta+msg)
sender := utils.ComplianceEventSender{
ClusterNamespace: pol.Namespace,
InstanceName: r.InstanceName,
ClientSet: r.Clientset,
ControllerName: ControllerName,
}

// emit an informational event
r.Recorder.Event(pol, eventType, "PolicyTemplateSync", msg)
ownerref := metav1.OwnerReference{
APIVersion: pol.APIVersion,
Kind: pol.Kind,
Name: pol.Name,
UID: pol.UID,
}

var compState policiesv1.ComplianceState
if strings.HasPrefix(msgMeta, "Compliant") {
compState = policiesv1.Compliant
} else if strings.HasPrefix(msgMeta, "NonCompliant") {
compState = policiesv1.NonCompliant
} else if strings.HasPrefix(msgMeta, "Pending") {
compState = policiesv1.Pending
}

return sender.SendEvent(ctx, nil, ownerref, policyComplianceReason, msgMeta+msg, compState)
}

// handleSyncSuccess performs common actions that should be run whenever a template is in sync,
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ func getManager(
os.Exit(1)
}

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

templateReconciler := &templatesync.PolicyReconciler{
Client: mgr.GetClient(),
DynamicWatcher: watcher,
Expand All @@ -459,6 +461,7 @@ func getManager(
Recorder: mgr.GetEventRecorderFor(templatesync.ControllerName),
ClusterNamespace: tool.Options.ClusterNamespace,
Clientset: kubernetes.NewForConfigOrDie(mgr.GetConfig()),
InstanceName: instanceName,
DisableGkSync: tool.Options.DisableGkSync,
}

Expand Down

0 comments on commit f0e2c60

Please sign in to comment.