Skip to content

Commit

Permalink
Record compliance events in the compliance history API with nanoseconds
Browse files Browse the repository at this point in the history
This helps with ordering when compliance events have a timestamp that is
the same down to the second.

Relates:
https://issues.redhat.com/browse/ACM-10155

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
  • Loading branch information
mprahl authored and openshift-merge-bot[bot] committed Feb 28, 2024
1 parent a332e0a commit a3c206c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
55 changes: 35 additions & 20 deletions controllers/statussync/policy_status_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -21,7 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -124,7 +125,7 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

err := r.ManagedClient.Get(ctx, request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
// The replicated policy on the managed cluster was deleted.
// check if it was deleted by user by checking if it still exists on hub
hubInstance := &policiesv1.Policy{}
Expand All @@ -133,7 +134,7 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
ctx, types.NamespacedName{Namespace: r.ClusterNamespaceOnHub, Name: request.Name}, hubInstance,
)
if err != nil {
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
// confirmed deleted on hub, doing nothing
reqLogger.Info("Policy was deleted, no status to update")

Expand Down Expand Up @@ -172,11 +173,11 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
err = r.HubClient.Get(ctx, types.NamespacedName{Namespace: r.ClusterNamespaceOnHub, Name: request.Name}, hubPlc)
if err != nil {
// hub policy not found, it has been deleted
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
reqLogger.Info("Hub policy not found, it has been deleted")
// try to delete local one
err = r.ManagedClient.Delete(ctx, instance)
if err == nil || errors.IsNotFound(err) {
if err == nil || k8serrors.IsNotFound(err) {
// no err or err is not found means local policy has been deleted
reqLogger.Info("Managed policy was deleted")

Expand Down Expand Up @@ -355,32 +356,25 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ

return !history[i].eventTime.Before(&history[j].eventTime)
}
// Timestamps are the same: attempt to use the event name.
// Conventionally (in client-go), the event name has a hexadecimal
// nanosecond timestamp as a suffix after a period.
iNameParts := strings.Split(history[i].EventName, ".")
jNameParts := strings.Split(history[j].EventName, ".")
errMsg := "Unable to interpret hexadecimal timestamp in event name, " +
"can't guarantee ordering of events in this status"

iNanos, err := strconv.ParseInt(iNameParts[len(iNameParts)-1], 16, 64)

iTime, err := parseTimestampFromEventName(history[i].EventName)
if err != nil {
reqLogger.Error(err, errMsg, "eventName", history[i].EventName)
reqLogger.Error(err, "Can't guarantee ordering of events in this status")

return false
}

jNanos, err := strconv.ParseInt(jNameParts[len(jNameParts)-1], 16, 64)
jTime, err := parseTimestampFromEventName(history[j].EventName)
if err != nil {
reqLogger.Error(err, errMsg, "eventName", history[j].EventName)
reqLogger.Error(err, "Can't guarantee ordering of events in this status")

return false
}

reqLogger.V(2).Info("Event timestamp collision, order determined by hex timestamp in name",
"event1Name", history[i].EventName, "event2Name", history[j].EventName)

return iNanos > jNanos
return iTime.After(jTime.Time)
}

return !history[i].LastTimestamp.Time.Before(history[j].LastTimestamp.Time)
Expand Down Expand Up @@ -495,6 +489,19 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, nil
}

// parseTimestampFromEventName will parse the event name for a hexadecimal nanosecond timestamp as a suffix after a
// period. This is a client-go convention that is repeated in the policy framework.
func parseTimestampFromEventName(eventName string) (metav1.Time, error) {
nameParts := strings.Split(eventName, ".")

nanos, err := strconv.ParseInt(nameParts[len(nameParts)-1], 16, 64)
if err != nil {
return metav1.Time{}, errors.New("Unable to find a valid hexadecimal timestamp in event name: " + eventName)
}

return metav1.Unix(0, nanos), nil
}

func parseComplianceFromMessage(message string) policiesv1.ComplianceState {
cleanMsg := strings.ToLower(
strings.TrimSpace(
Expand Down Expand Up @@ -535,10 +542,18 @@ func ceRequestFromEvent(event *corev1.Event) (utils.ComplianceAPIEventRequest, e

compliance := parseComplianceFromMessage(event.Message)

var timestamp metav1.Time

if timestampFromEvent, err := parseTimestampFromEventName(event.Name); err == nil {
timestamp = timestampFromEvent
} else {
timestamp = event.LastTimestamp
}

ce.Event = utils.ComplianceAPIEvent{
Compliance: compliance,
Message: strings.TrimLeft(event.Message[len(compliance):], " ;"),
Timestamp: event.LastTimestamp.Format(time.RFC3339Nano),
Timestamp: timestamp.Format(time.RFC3339Nano),
ReportedBy: "governance-policy-framework",
}

Expand Down Expand Up @@ -571,7 +586,7 @@ func StartComplianceEventsSyncer(
var clusterID string

idClusterClaim, err := managedClient.Resource(clusterClaimGVR).Get(ctx, "id.k8s.io", metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
if err != nil && !k8serrors.IsNotFound(err) {
return err
}

Expand Down
20 changes: 20 additions & 0 deletions controllers/statussync/policy_status_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Contributors to the Open Cluster Management project
package statussync

import (
"testing"
)

func TestParseTimestampFromEventName(t *testing.T) {
output, err := parseTimestampFromEventName("event.17b80d88a995e12c")
if err != nil {
t.Errorf("Expected no error but got: %v", err)
} else if output.Time.UnixNano() != 1709130939198988588 {
t.Errorf("Expected 1709130939198988588 but got: %d", output.Time.UnixNano())
}

output, err = parseTimestampFromEventName("event.with-no-timestamp")
if err == nil {
t.Errorf("Expected an error but got none: %s", output)
}
}
9 changes: 5 additions & 4 deletions test/e2e/case23_compliance_api_recording_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,8 @@ var _ = Describe("Compliance API recording", Ordered, Label("compliance-events-a
})

AfterAll(func(ctx context.Context) {
err := server.Shutdown(ctx)
Expect(err).ToNot(HaveOccurred())

By("Deleting a policy on hub cluster in ns:" + clusterNamespaceOnHub)
_, err = kubectlHub("delete", "-f", yamlPath, "-n", clusterNamespaceOnHub, "--ignore-not-found")
_, err := kubectlHub("delete", "-f", yamlPath, "-n", clusterNamespaceOnHub, "--ignore-not-found")
Expect(err).ToNot(HaveOccurred())
opt := metav1.ListOptions{}
utils.ListWithTimeout(clientHubDynamic, gvrPolicy, opt, 0, true, defaultTimeoutSeconds)
Expand All @@ -167,6 +164,10 @@ var _ = Describe("Compliance API recording", Ordered, Label("compliance-events-a
By("clean up all events")
_, err = kubectlManaged("delete", "events", "-n", clusterNamespace, "--all")
Expect(err).ShouldNot(HaveOccurred())

// Shutdown after all clean up is done in case there were some events queued up by the controllers.
err = server.Shutdown(ctx)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
Expand Down

0 comments on commit a3c206c

Please sign in to comment.