Skip to content

Commit

Permalink
Update Events to be a delegating event recorder
Browse files Browse the repository at this point in the history
Update controllers.Events to be a delegating event recorder compatible
with the upstream k8s EventRecorder interface. By default, when a
MakeEvents() is used to create an instance of Events, it contains a k8s
event recorder. Additional event recorders can be added to Events to
forward the events to all the EventRecorder implementations.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Oct 10, 2021
1 parent c300f37 commit 8ab2c39
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 49 deletions.
79 changes: 30 additions & 49 deletions runtime/controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@ limitations under the License.
package controller

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/runtime/events"
)

// Events is a helper struct that adds the capability of sending events to the Kubernetes API and an external event
// recorder, like the GitOps Toolkit notification-controller.
// Events is a delegating event recorder that adds the capability of sending events to the Kubernetes API and any other
// event recorder, like the GitOps Toolkit notification-controller.
//
// Use it by embedding it in your reconciler struct:
//
Expand All @@ -40,58 +34,45 @@ import (
// }
//
// Use MakeEvents to create a working Events value; in most cases the value needs to be initialised just once per
// controller, as the specialised logger and object reference data are gathered from the arguments provided to the
// Eventf method.
// controller. Additional event recorders can be appended to forward the events to them.
type Events struct {
Scheme *runtime.Scheme
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
Scheme *runtime.Scheme
EventRecorders []kuberecorder.EventRecorder
}

var _ kuberecorder.EventRecorder = &Events{}

// MakeEvents creates a new Events, with the Events.Scheme set to that of the given mgr and a newly initialised
// Events.EventRecorder for the given controllerName.
func MakeEvents(mgr ctrl.Manager, controllerName string, ext *events.Recorder) Events {
// kubernetes EventRecorder for the given controllerName. Any additional EventRecorders are appended to the
// Events.EventRecorders.
func MakeEvents(mgr ctrl.Manager, controllerName string, eventRecs ...kuberecorder.EventRecorder) Events {
// Add the kubernetes event recorder.
recorders := []kuberecorder.EventRecorder{
mgr.GetEventRecorderFor(controllerName),
}
// Add the other event recorders.
recorders = append(recorders, eventRecs...)
return Events{
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: ext,
Scheme: mgr.GetScheme(),
EventRecorders: recorders,
}
}

// Event emits a Kubernetes event, and forwards the event to the ExternalEventRecorder if configured.
// Use EventWithMeta or EventWithMetaf if you want to attach metadata to the external event.
func (e Events) Event(ctx context.Context, obj client.Object, severity, reason, msg string) {
e.EventWithMetaf(ctx, obj, nil, severity, reason, msg)
}

// Eventf emits a Kubernetes event, and forwards the event to the ExternalEventRecorder if configured.
// Use EventWithMeta or EventWithMetaf if you want to attach metadata to the external event.
func (e Events) Eventf(ctx context.Context, obj client.Object, severity, reason, msgFmt string, args ...interface{}) {
e.EventWithMetaf(ctx, obj, nil, severity, reason, msgFmt, args...)
}

// EventWithMeta emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured.
func (e Events) EventWithMeta(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msg string) {
e.EventWithMetaf(ctx, obj, metadata, severity, reason, msg)
// Event forwards the event to the event recorders.
// Use AnnotatedEventf to attach metadata to the event.
func (e Events) Event(obj runtime.Object, eventtype, reason, msg string) {
e.AnnotatedEventf(obj, nil, eventtype, reason, msg)
}

// EventWithMetaf emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured.
func (e Events) EventWithMetaf(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msgFmt string, args ...interface{}) {
if e.EventRecorder != nil {
e.EventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...)
}
if e.ExternalEventRecorder != nil {
e.ExternalEventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...)
}
// Eventf forwards the event to the event recorders.
// Use AnnotatedEventf to attach metadata to the event.
func (e Events) Eventf(obj runtime.Object, eventtype, reason, msgFmt string, args ...interface{}) {
e.AnnotatedEventf(obj, nil, eventtype, reason, msgFmt, args...)
}

// severityToEventType maps the given severity string to a corev1 EventType.
// In case of an unrecognised severity, EventTypeNormal is returned.
func severityToEventType(severity string) string {
switch severity {
case events.EventSeverityError:
return corev1.EventTypeWarning
default:
return corev1.EventTypeNormal
// AnnotatedEventf forwards the event and metadata to the event recorders.
func (e Events) AnnotatedEventf(obj runtime.Object, metadata map[string]string, eventtype, reason, msgFmt string, args ...interface{}) {
for _, er := range e.EventRecorders {
er.AnnotatedEventf(obj, metadata, eventtype, reason, msgFmt, args...)
}
}
69 changes: 69 additions & 0 deletions runtime/controller/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"

"github.com/fluxcd/pkg/runtime/events"
)

func TestEvent_AnnotatedEventf(t *testing.T) {
g := NewWithT(t)

// Run an external server to receive the events.
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
b, err := io.ReadAll(r.Body)
g.Expect(err).ToNot(HaveOccurred())

var payload events.Event
err = json.Unmarshal(b, &payload)
g.Expect(err).ToNot(HaveOccurred())

g.Expect(payload.InvolvedObject.Kind).To(Equal("ConfigMap"))
g.Expect(payload.InvolvedObject.Name).To(Equal("test-cm"))
g.Expect(payload.InvolvedObject.Namespace).To(Equal("default"))
g.Expect(payload.Metadata).To(HaveKeyWithValue("test", "true"))
g.Expect(payload.Reason).To(Equal("foo"))
}))
defer ts.Close()

// Create an external event recorder.
extEventRec, err := events.NewRecorder(env.GetScheme(), env.GetLogger(), ts.URL, "test-controller")
g.Expect(err).ToNot(HaveOccurred())

// Create an Events with the external event recorder.
event := MakeEvents(env, "test-controller", extEventRec)
obj := &corev1.ConfigMap{}
obj.Namespace = "default"
obj.Name = "test-cm"
meta := map[string]string{
"test": "true",
}

event.AnnotatedEventf(obj, meta, corev1.EventTypeNormal, "foo", "foo")
g.Expect(requestCount).To(BeNumerically(">", 0))
}
61 changes: 61 additions & 0 deletions runtime/controller/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"fmt"
"os"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/fluxcd/pkg/runtime/testenv"
)

var (
env *testenv.Environment
ctx = ctrl.SetupSignalHandler()
)

func TestMain(m *testing.M) {
scheme := runtime.NewScheme()
utilruntime.Must(corev1.AddToScheme(scheme))

env = testenv.New(
testenv.WithScheme(scheme),
)

go func() {
fmt.Println("Starting the test environment")
if err := env.Start(ctx); err != nil {
panic(fmt.Sprintf("Failed to start the test environment manager: %v", err))
}
}()
<-env.Manager.Elected()

code := m.Run()

fmt.Println("Stopping the test environment")
if err := env.Stop(); err != nil {
panic(fmt.Sprintf("Failed to stop the test environment: %v", err))
}

os.Exit(code)
}

0 comments on commit 8ab2c39

Please sign in to comment.