diff --git a/runtime/controller/events.go b/runtime/controller/events.go index 7889ab9e8..becdd0b60 100644 --- a/runtime/controller/events.go +++ b/runtime/controller/events.go @@ -19,11 +19,9 @@ package controller import ( "context" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" kuberecorder "k8s.io/client-go/tools/record" - "k8s.io/client-go/tools/reference" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -80,18 +78,10 @@ func (e Events) EventWithMeta(ctx context.Context, obj client.Object, metadata m // EventWithMetaf emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured. func (e Events) EventWithMetaf(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msgFmt string, args ...interface{}) { if e.EventRecorder != nil { - e.EventRecorder.Eventf(obj, severityToEventType(severity), reason, msgFmt, args...) + e.EventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...) } if e.ExternalEventRecorder != nil { - ref, err := reference.GetReference(e.Scheme, obj) - if err != nil { - logr.FromContextOrDiscard(ctx).Error(err, "unable to get object reference to send event") - return - } - if err := e.ExternalEventRecorder.Eventf(*ref, metadata, severity, reason, msgFmt, args...); err != nil { - logr.FromContextOrDiscard(ctx).Error(err, "unable to send event") - return - } + e.ExternalEventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...) } } diff --git a/runtime/events/event.go b/runtime/events/event.go index 467b2cbda..419dbe923 100644 --- a/runtime/events/event.go +++ b/runtime/events/event.go @@ -34,6 +34,9 @@ const ( EventSeverityError string = "error" ) +// EventTypeTrace represents a trace event. +const EventTypeTrace string = "Trace" + // Event is a report of an event issued by a controller. // +kubebuilder:object:generate=true type Event struct { diff --git a/runtime/events/recorder.go b/runtime/events/recorder.go index b7ff0260c..dd9f7d319 100644 --- a/runtime/events/recorder.go +++ b/runtime/events/recorder.go @@ -25,9 +25,13 @@ import ( "os" "time" + "github.com/go-logr/logr" "github.com/hashicorp/go-retryablehttp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" ) // Recorder posts events to the webhook address. @@ -40,11 +44,19 @@ type Recorder struct { // Retryable HTTP client. Client *retryablehttp.Client + + // Scheme of the recorded objects. + Scheme *runtime.Scheme + + // Log is the recorder logger. + Log logr.Logger } +var _ kuberecorder.EventRecorder = &Recorder{} + // NewRecorder creates an event Recorder with default settings. // The recorder performs automatic retries for connection errors and 500-range response codes. -func NewRecorder(webhook, reportingController string) (*Recorder, error) { +func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingController string) (*Recorder, error) { if _, err := url.Parse(webhook); err != nil { return nil, err } @@ -55,89 +67,110 @@ func NewRecorder(webhook, reportingController string) (*Recorder, error) { httpClient.Logger = nil return &Recorder{ + Scheme: scheme, Webhook: webhook, ReportingController: reportingController, Client: httpClient, + Log: log, }, nil } -// EventInfof records an event with information severity. -func (r *Recorder) EventInfof( - object corev1.ObjectReference, - metadata map[string]string, - reason string, messageFmt string, args ...interface{}) error { - return r.Eventf(object, metadata, EventSeverityInfo, reason, messageFmt, args...) +// Event records an event in the webhook address. +func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) { + r.AnnotatedEventf(object, nil, eventtype, reason, message) } -// EventErrorf records an event with error severity. -func (r *Recorder) EventErrorf( - object corev1.ObjectReference, - metadata map[string]string, - reason string, messageFmt string, args ...interface{}) error { - return r.Eventf(object, metadata, EventSeverityError, reason, messageFmt, args...) +// Event records an event in the webhook address. +func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...) } -// Eventf constructs an event from the given information and performs a HTTP POST to the webhook address. -func (r *Recorder) Eventf( - object corev1.ObjectReference, - metadata map[string]string, - severity, reason string, - messageFmt string, args ...interface{}) error { +// AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address. +func (r *Recorder) AnnotatedEventf( + object runtime.Object, + annotations map[string]string, + eventtype, reason string, + messageFmt string, args ...interface{}) { + + ref, err := reference.GetReference(r.Scheme, object) + if err != nil { + r.Log.Error(err, "failed to get object reference") + } + + // Convert the eventType to severity. + severity := eventTypeToSeverity(eventtype) // Do not send trace events to notification controller, // traces are persisted as Kubernetes events only. if severity == EventSeverityTrace { - return nil + return } if r.Client == nil { - return fmt.Errorf("retryable HTTP client has not been initialized") + r.Log.Error(nil, "retryable HTTP client has not been initialized") + return } message := fmt.Sprintf(messageFmt, args...) - if object.Kind == "" { - return fmt.Errorf("failed to get object kind") + if ref.Kind == "" { + r.Log.Error(nil, "failed to get object kind") + return } - if object.Name == "" { - return fmt.Errorf("failed to get object name") + if ref.Name == "" { + r.Log.Error(nil, "failed to get object name") + return } - if object.Namespace == "" { - return fmt.Errorf("failed to get object namespace") + if ref.Namespace == "" { + r.Log.Error(nil, "failed to get object namespace") + return } hostname, err := os.Hostname() if err != nil { - return err + r.Log.Error(err, "failed to get hostname") + return } event := Event{ - InvolvedObject: object, + InvolvedObject: *ref, Severity: severity, Timestamp: metav1.Now(), Message: message, Reason: reason, - Metadata: metadata, + Metadata: annotations, ReportingController: r.ReportingController, ReportingInstance: hostname, } body, err := json.Marshal(event) if err != nil { - return fmt.Errorf("failed to marshal object into json, error: %w", err) + r.Log.Error(err, "failed to marshal object into json") + return } // avoid retrying rate limited requests if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil && (res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) { - return nil + return } if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil { - return err + return } +} - return nil +// eventTypeToSeverity maps the given eventType string to a gotk event severity +// type. +func eventTypeToSeverity(eventType string) string { + switch eventType { + case corev1.EventTypeWarning: + return EventSeverityError + case EventTypeTrace: + return EventSeverityTrace + default: + return EventSeverityInfo + } } diff --git a/runtime/events/recorder_test.go b/runtime/events/recorder_test.go index 12c2439c3..36918f4eb 100644 --- a/runtime/events/recorder_test.go +++ b/runtime/events/recorder_test.go @@ -25,10 +25,15 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" ) -func TestEventRecorder_Eventf(t *testing.T) { +func TestEventRecorder_AnnotatedEventf(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -36,7 +41,7 @@ func TestEventRecorder_Eventf(t *testing.T) { err = json.Unmarshal(b, &payload) require.NoError(t, err) - require.Equal(t, "GitRepository", payload.InvolvedObject.Kind) + require.Equal(t, "ConfigMap", payload.InvolvedObject.Kind) require.Equal(t, "webapp", payload.InvolvedObject.Name) require.Equal(t, "gitops-system", payload.InvolvedObject.Namespace) require.Equal(t, "true", payload.Metadata["test"]) @@ -45,24 +50,31 @@ func TestEventRecorder_Eventf(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" + meta := map[string]string{ "test": "true", } - err = eventRecorder.EventInfof(obj, meta, "sync", "sync %s", obj.Name) - require.NoError(t, err) + eventRecorder.AnnotatedEventf(obj, meta, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + require.Equal(t, 2, requestCount) + + // When a trace event is sent, it's dropped, no new request. + eventRecorder.AnnotatedEventf(obj, meta, EventTypeTrace, "sync", "sync %s", obj.Name) + require.Equal(t, 2, requestCount) } -func TestEventRecorder_Eventf_Retry(t *testing.T) { +func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -74,22 +86,24 @@ func TestEventRecorder_Eventf_Retry(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) eventRecorder.Client.RetryMax = 2 - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" - err = eventRecorder.EventErrorf(obj, nil, "sync", "sync %s", obj.Name) - require.Error(t, err) + eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + require.True(t, requestCount > 1) } -func TestEventRecorder_Eventf_RateLimited(t *testing.T) { +func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -101,16 +115,16 @@ func TestEventRecorder_Eventf_RateLimited(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) eventRecorder.Client.RetryMax = 2 - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" - err = eventRecorder.EventInfof(obj, nil, "sync", "sync %s", obj.Name) - require.NoError(t, err) + eventRecorder.AnnotatedEventf(obj, nil, "sync", "sync %s", obj.Name) + require.Equal(t, 1, requestCount) }