diff --git a/runtime/controller/events.go b/runtime/controller/events.go deleted file mode 100644 index 7889ab9e..00000000 --- a/runtime/controller/events.go +++ /dev/null @@ -1,107 +0,0 @@ -/* -Copyright 2020 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - kuberecorder "k8s.io/client-go/tools/record" - "k8s.io/client-go/tools/reference" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/fluxcd/pkg/runtime/events" -) - -// Events is a helper struct that adds the capability of sending events to the Kubernetes API and an external event -// recorder, like the GitOps Toolkit notification-controller. -// -// Use it by embedding it in your reconciler struct: -// -// type MyTypeReconciler { -// client.Client -// // ... etc. -// controller.Events -// } -// -// Use MakeEvents to create a working Events value; in most cases the value needs to be initialised just once per -// controller, as the specialised logger and object reference data are gathered from the arguments provided to the -// Eventf method. -type Events struct { - Scheme *runtime.Scheme - EventRecorder kuberecorder.EventRecorder - ExternalEventRecorder *events.Recorder -} - -// MakeEvents creates a new Events, with the Events.Scheme set to that of the given mgr and a newly initialised -// Events.EventRecorder for the given controllerName. -func MakeEvents(mgr ctrl.Manager, controllerName string, ext *events.Recorder) Events { - return Events{ - Scheme: mgr.GetScheme(), - EventRecorder: mgr.GetEventRecorderFor(controllerName), - ExternalEventRecorder: ext, - } -} - -// Event emits a Kubernetes event, and forwards the event to the ExternalEventRecorder if configured. -// Use EventWithMeta or EventWithMetaf if you want to attach metadata to the external event. -func (e Events) Event(ctx context.Context, obj client.Object, severity, reason, msg string) { - e.EventWithMetaf(ctx, obj, nil, severity, reason, msg) -} - -// Eventf emits a Kubernetes event, and forwards the event to the ExternalEventRecorder if configured. -// Use EventWithMeta or EventWithMetaf if you want to attach metadata to the external event. -func (e Events) Eventf(ctx context.Context, obj client.Object, severity, reason, msgFmt string, args ...interface{}) { - e.EventWithMetaf(ctx, obj, nil, severity, reason, msgFmt, args...) -} - -// EventWithMeta emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured. -func (e Events) EventWithMeta(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msg string) { - e.EventWithMetaf(ctx, obj, metadata, severity, reason, msg) -} - -// EventWithMetaf emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured. -func (e Events) EventWithMetaf(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msgFmt string, args ...interface{}) { - if e.EventRecorder != nil { - e.EventRecorder.Eventf(obj, severityToEventType(severity), reason, msgFmt, args...) - } - if e.ExternalEventRecorder != nil { - ref, err := reference.GetReference(e.Scheme, obj) - if err != nil { - logr.FromContextOrDiscard(ctx).Error(err, "unable to get object reference to send event") - return - } - if err := e.ExternalEventRecorder.Eventf(*ref, metadata, severity, reason, msgFmt, args...); err != nil { - logr.FromContextOrDiscard(ctx).Error(err, "unable to send event") - return - } - } -} - -// severityToEventType maps the given severity string to a corev1 EventType. -// In case of an unrecognised severity, EventTypeNormal is returned. -func severityToEventType(severity string) string { - switch severity { - case events.EventSeverityError: - return corev1.EventTypeWarning - default: - return corev1.EventTypeNormal - } -} diff --git a/runtime/events/event.go b/runtime/events/event.go index 467b2cbd..419dbe92 100644 --- a/runtime/events/event.go +++ b/runtime/events/event.go @@ -34,6 +34,9 @@ const ( EventSeverityError string = "error" ) +// EventTypeTrace represents a trace event. +const EventTypeTrace string = "Trace" + // Event is a report of an event issued by a controller. // +kubebuilder:object:generate=true type Event struct { diff --git a/runtime/events/recorder.go b/runtime/events/recorder.go index b7ff0260..6f650cfc 100644 --- a/runtime/events/recorder.go +++ b/runtime/events/recorder.go @@ -25,12 +25,34 @@ import ( "os" "time" + "github.com/go-logr/logr" "github.com/hashicorp/go-retryablehttp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" + ctrl "sigs.k8s.io/controller-runtime" ) -// Recorder posts events to the webhook address. +// Recorder posts events to the Kubernetes API and any other event recorder webhook address, like the GitOps Toolkit +// notification-controller. +// +// Use it by embedding EventRecorder in reconciler struct: +// +// import ( +// ... +// kuberecorder "k8s.io/client-go/tools/record" +// ... +// ) +// +// type MyTypeReconciler { +// client.Client +// // ... etc. +// kuberecorder.EventRecorder +// } +// +// Use NewRecorder to create a working Recorder. type Recorder struct { // URL address of the events endpoint. Webhook string @@ -40,11 +62,23 @@ type Recorder struct { // Retryable HTTP client. Client *retryablehttp.Client + + // EventRecorder is the Kubernetes event recorder. + EventRecorder kuberecorder.EventRecorder + + // Scheme to look up the recorded objects. + Scheme *runtime.Scheme + + // Log is the recorder logger. + Log logr.Logger } -// NewRecorder creates an event Recorder with default settings. -// The recorder performs automatic retries for connection errors and 500-range response codes. -func NewRecorder(webhook, reportingController string) (*Recorder, error) { +var _ kuberecorder.EventRecorder = &Recorder{} + +// NewRecorder creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the +// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the +// external recorder. +func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) { if _, err := url.Parse(webhook); err != nil { return nil, err } @@ -55,89 +89,123 @@ func NewRecorder(webhook, reportingController string) (*Recorder, error) { httpClient.Logger = nil return &Recorder{ + Scheme: mgr.GetScheme(), Webhook: webhook, ReportingController: reportingController, Client: httpClient, + EventRecorder: mgr.GetEventRecorderFor(reportingController), + Log: log, }, nil } -// EventInfof records an event with information severity. -func (r *Recorder) EventInfof( - object corev1.ObjectReference, - metadata map[string]string, - reason string, messageFmt string, args ...interface{}) error { - return r.Eventf(object, metadata, EventSeverityInfo, reason, messageFmt, args...) +// Event records an event in the webhook address. +func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) { + r.AnnotatedEventf(object, nil, eventtype, reason, message) } -// EventErrorf records an event with error severity. -func (r *Recorder) EventErrorf( - object corev1.ObjectReference, - metadata map[string]string, - reason string, messageFmt string, args ...interface{}) error { - return r.Eventf(object, metadata, EventSeverityError, reason, messageFmt, args...) +// Event records an event in the webhook address. +func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...) } -// Eventf constructs an event from the given information and performs a HTTP POST to the webhook address. -func (r *Recorder) Eventf( - object corev1.ObjectReference, - metadata map[string]string, - severity, reason string, - messageFmt string, args ...interface{}) error { +// AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address. +func (r *Recorder) AnnotatedEventf( + object runtime.Object, + annotations map[string]string, + eventtype, reason string, + messageFmt string, args ...interface{}) { + + ref, err := reference.GetReference(r.Scheme, object) + if err != nil { + r.Log.Error(err, "failed to get object reference") + } + + // Add object info in the logger. + log := r.Log.WithValues("name", ref.Name, "namespace", ref.Namespace, "reconciler kind", ref.Kind) + + // Convert the eventType to severity. + severity := eventTypeToSeverity(eventtype) // Do not send trace events to notification controller, - // traces are persisted as Kubernetes events only. + // traces are persisted as Kubernetes events only as normal events. if severity == EventSeverityTrace { - return nil + r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...) + return } + // Forward the event to the Kubernetes recorder. + r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...) + if r.Client == nil { - return fmt.Errorf("retryable HTTP client has not been initialized") + err := fmt.Errorf("retryable HTTP client has not been initialized") + log.Error(err, "unable to record event") + return } message := fmt.Sprintf(messageFmt, args...) - if object.Kind == "" { - return fmt.Errorf("failed to get object kind") + if ref.Kind == "" { + err := fmt.Errorf("failed to get object kind") + log.Error(err, "unable to record event") + return } - if object.Name == "" { - return fmt.Errorf("failed to get object name") + if ref.Name == "" { + err := fmt.Errorf("failed to get object name") + log.Error(err, "unable to record event") + return } - if object.Namespace == "" { - return fmt.Errorf("failed to get object namespace") + if ref.Namespace == "" { + err := fmt.Errorf("failed to get object namespace") + log.Error(err, "unable to record event") + return } hostname, err := os.Hostname() if err != nil { - return err + log.Error(err, "failed to get hostname") + return } event := Event{ - InvolvedObject: object, + InvolvedObject: *ref, Severity: severity, Timestamp: metav1.Now(), Message: message, Reason: reason, - Metadata: metadata, + Metadata: annotations, ReportingController: r.ReportingController, ReportingInstance: hostname, } body, err := json.Marshal(event) if err != nil { - return fmt.Errorf("failed to marshal object into json, error: %w", err) + log.Error(err, "failed to marshal object into json") + return } // avoid retrying rate limited requests if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil && (res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) { - return nil + return } if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil { - return err + log.Error(err, "unable to record event") + return } +} - return nil +// eventTypeToSeverity maps the given eventType string to a GOTK event severity +// type. +func eventTypeToSeverity(eventType string) string { + switch eventType { + case corev1.EventTypeWarning: + return EventSeverityError + case EventTypeTrace: + return EventSeverityTrace + default: + return EventSeverityInfo + } } diff --git a/runtime/events/recorder_test.go b/runtime/events/recorder_test.go index 12c2439c..2c539fd0 100644 --- a/runtime/events/recorder_test.go +++ b/runtime/events/recorder_test.go @@ -25,10 +25,13 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" ) -func TestEventRecorder_Eventf(t *testing.T) { +func TestEventRecorder_AnnotatedEventf(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -36,7 +39,7 @@ func TestEventRecorder_Eventf(t *testing.T) { err = json.Unmarshal(b, &payload) require.NoError(t, err) - require.Equal(t, "GitRepository", payload.InvolvedObject.Kind) + require.Equal(t, "ConfigMap", payload.InvolvedObject.Kind) require.Equal(t, "webapp", payload.InvolvedObject.Name) require.Equal(t, "gitops-system", payload.InvolvedObject.Namespace) require.Equal(t, "true", payload.Metadata["test"]) @@ -45,24 +48,29 @@ func TestEventRecorder_Eventf(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" + meta := map[string]string{ "test": "true", } - err = eventRecorder.EventInfof(obj, meta, "sync", "sync %s", obj.Name) - require.NoError(t, err) + eventRecorder.AnnotatedEventf(obj, meta, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + require.Equal(t, 2, requestCount) + + // When a trace event is sent, it's dropped, no new request. + eventRecorder.AnnotatedEventf(obj, meta, EventTypeTrace, "sync", "sync %s", obj.Name) + require.Equal(t, 2, requestCount) } -func TestEventRecorder_Eventf_Retry(t *testing.T) { +func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -74,22 +82,22 @@ func TestEventRecorder_Eventf_Retry(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) eventRecorder.Client.RetryMax = 2 - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" - err = eventRecorder.EventErrorf(obj, nil, "sync", "sync %s", obj.Name) - require.Error(t, err) + eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + require.True(t, requestCount > 1) } -func TestEventRecorder_Eventf_RateLimited(t *testing.T) { +func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) { + requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ b, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -101,16 +109,14 @@ func TestEventRecorder_Eventf_RateLimited(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(ts.URL, "test-controller") + eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller") require.NoError(t, err) eventRecorder.Client.RetryMax = 2 - obj := corev1.ObjectReference{ - Kind: "GitRepository", - Namespace: "gitops-system", - Name: "webapp", - } + obj := &corev1.ConfigMap{} + obj.Namespace = "gitops-system" + obj.Name = "webapp" - err = eventRecorder.EventInfof(obj, nil, "sync", "sync %s", obj.Name) - require.NoError(t, err) + eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + require.Equal(t, 1, requestCount) } diff --git a/runtime/events/suite_test.go b/runtime/events/suite_test.go new file mode 100644 index 00000000..d53a94fa --- /dev/null +++ b/runtime/events/suite_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + "os" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/fluxcd/pkg/runtime/testenv" +) + +var ( + env *testenv.Environment + ctx = ctrl.SetupSignalHandler() +) + +func TestMain(m *testing.M) { + scheme := runtime.NewScheme() + utilruntime.Must(corev1.AddToScheme(scheme)) + + env = testenv.New( + testenv.WithScheme(scheme), + ) + + go func() { + fmt.Println("Starting the test environment") + if err := env.Start(ctx); err != nil { + panic(fmt.Sprintf("Failed to start the test environment manager: %v", err)) + } + }() + <-env.Manager.Elected() + + // Create test namespace. + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitops-system", + }, + } + if err := env.Client.Create(ctx, ns); err != nil { + panic(fmt.Sprintf("Failed to create gitops-system namespace: %v", err)) + } + + code := m.Run() + + if err := env.Client.Delete(ctx, ns); err != nil { + panic(fmt.Sprintf("Failed to delete gitops-system namespace: %v", err)) + } + + fmt.Println("Stopping the test environment") + if err := env.Stop(); err != nil { + panic(fmt.Sprintf("Failed to stop the test environment: %v", err)) + } + + os.Exit(code) +}